Skip to content

Commit

Permalink
make sure mockbroker can handle different reponse header versions
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk authored and sladkoff committed Feb 20, 2020
1 parent 9ef5c3b commit 023606a
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 44 deletions.
2 changes: 1 addition & 1 deletion async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
lastBatchFirstSeq := -1
lastBatchSize := -1
lastSequenceWrittenToDisk := -1
handlerFailBeforeWrite := func(req *request) (res encoder) {
handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
switch req.body.key() {
case 3:
return metadataResponse
Expand Down
4 changes: 2 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
}

header := responseHeader{}
err = versionedDecode(buf, &header, 1)
err = versionedDecode(buf, &header, 0)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1314,7 +1314,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
}

header := responseHeader{}
err = versionedDecode(buf, &header, 1)
err = versionedDecode(buf, &header, 0)
if err != nil {
return bytesRead, err
}
Expand Down
4 changes: 4 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (m mockEncoder) encode(pe packetEncoder) error {
return pe.putRawBytes(m.bytes)
}

func (m mockEncoder) headerVersion() int16 {
return 0
}

type brokerMetrics struct {
bytesRead int
bytesWritten int
Expand Down
5 changes: 5 additions & 0 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type encoder interface {
encode(pe packetEncoder) error
}

type encoderWithHeader interface {
encoder
headerVersion() int16
}

// Encode takes an Encoder and turns it into bytes while potentially recording metrics.
func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
if e == nil {
Expand Down
35 changes: 26 additions & 9 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (

type GSSApiHandlerFunc func([]byte) []byte

type requestHandlerFunc func(req *request) (res encoder)
type requestHandlerFunc func(req *request) (res encoderWithHeader)

// RequestNotifierFunc is invoked when a mock broker processes a request successfully
// and will provides the number of bytes read and written.
Expand Down Expand Up @@ -55,7 +55,7 @@ type MockBroker struct {
port int32
closing chan none
stopper chan none
expectations chan encoder
expectations chan encoderWithHeader
listener net.Listener
t TestReporter
latency time.Duration
Expand Down Expand Up @@ -83,7 +83,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) {
// and uses the found MockResponse instance to generate an appropriate reply.
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
b.setHandler(func(req *request) (res encoder) {
b.setHandler(func(req *request) (res encoderWithHeader) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
mockResponse := handlerMap[reqTypeName]
if mockResponse == nil {
Expand Down Expand Up @@ -231,7 +231,6 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
}
}()

resHeader := make([]byte, 8)
var bytesWritten int
var bytesRead int
for {
Expand Down Expand Up @@ -281,8 +280,7 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
continue
}

binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
if _, err = conn.Write(resHeader); err != nil {
b.serverError(err)
break
Expand Down Expand Up @@ -318,7 +316,26 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
}

func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {

headerLength := uint32(8)

if headerVersion >= 1 {
headerLength = 9
}

resHeader := make([]byte, headerLength)
binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))

if headerVersion >= 1 {
binary.PutUvarint(resHeader[8:], 0)
}

return resHeader
}

func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
select {
case res, ok := <-b.expectations:
if !ok {
Expand Down Expand Up @@ -373,7 +390,7 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener
stopper: make(chan none),
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
expectations: make(chan encoderWithHeader, 512),
listener: listener,
}
broker.handler = broker.defaultRequestHandler
Expand All @@ -394,6 +411,6 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener
return broker
}

func (b *MockBroker) Returns(e encoder) {
func (b *MockBroker) Returns(e encoderWithHeader) {
b.expectations <- e
}
58 changes: 29 additions & 29 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ type TestReporter interface {
// allows generating a response based on a request body. MockResponses are used
// to program behavior of MockBroker in tests.
type MockResponse interface {
For(reqBody versionedDecoder) (res encoder)
For(reqBody versionedDecoder) (res encoderWithHeader)
}

// MockWrapper is a mock response builder that returns a particular concrete
// response regardless of the actual request passed to the `For` method.
type MockWrapper struct {
res encoder
res encoderWithHeader
}

func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
return mw.res
}

func NewMockWrapper(res encoder) *MockWrapper {
func NewMockWrapper(res encoderWithHeader) *MockWrapper {
return &MockWrapper{res: res}
}

Expand All @@ -50,7 +50,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence {
switch res := res.(type) {
case MockResponse:
ms.responses[i] = res
case encoder:
case encoderWithHeader:
ms.responses[i] = NewMockWrapper(res)
default:
panic(fmt.Sprintf("Unexpected response type: %T", res))
Expand All @@ -59,7 +59,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence {
return ms
}

func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
res = mc.responses[0].For(reqBody)
if len(mc.responses) > 1 {
mc.responses = mc.responses[1:]
Expand All @@ -79,7 +79,7 @@ func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
}
}

func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
request := reqBody.(*ListGroupsRequest)
_ = request
response := &ListGroupsResponse{
Expand Down Expand Up @@ -110,7 +110,7 @@ func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, descrip
return m
}

func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
request := reqBody.(*DescribeGroupsRequest)

response := &DescribeGroupsResponse{}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResp
return mmr
}

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
metadataRequest := reqBody.(*MetadataRequest)
metadataResponse := &MetadataResponse{
Version: metadataRequest.version(),
Expand Down Expand Up @@ -233,7 +233,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of
return mor
}

func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
offsetRequest := reqBody.(*OffsetRequest)
offsetResponse := &OffsetResponse{Version: mor.version}
for topic, partitions := range offsetRequest.blocks {
Expand Down Expand Up @@ -309,7 +309,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of
return mfr
}

func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
fetchRequest := reqBody.(*FetchRequest)
res := &FetchResponse{
Version: mfr.version,
Expand Down Expand Up @@ -393,7 +393,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M
return mr
}

func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*ConsumerMetadataRequest)
group := req.ConsumerGroup
res := &ConsumerMetadataResponse{}
Expand Down Expand Up @@ -442,7 +442,7 @@ func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType,
return mr
}

func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*FindCoordinatorRequest)
res := &FindCoordinatorResponse{}
var v interface{}
Expand Down Expand Up @@ -489,7 +489,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3
return mr
}

func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*OffsetCommitRequest)
group := req.ConsumerGroup
res := &OffsetCommitResponse{}
Expand Down Expand Up @@ -546,7 +546,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
return mr
}

func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{
Version: mr.version,
Expand Down Expand Up @@ -605,7 +605,7 @@ func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchRespo
return mr
}

func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
res := &OffsetFetchResponse{Version: req.Version}
Expand All @@ -630,7 +630,7 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
return &MockCreateTopicsResponse{t: t}
}

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreateTopicsRequest)
res := &CreateTopicsResponse{
Version: req.Version,
Expand Down Expand Up @@ -659,7 +659,7 @@ func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
return &MockDeleteTopicsResponse{t: t}
}

func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteTopicsRequest)
res := &DeleteTopicsResponse{}
res.TopicErrorCodes = make(map[string]KError)
Expand All @@ -679,7 +679,7 @@ func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsRespon
return &MockCreatePartitionsResponse{t: t}
}

func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreatePartitionsRequest)
res := &CreatePartitionsResponse{}
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
Expand All @@ -706,7 +706,7 @@ func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartit
return &MockAlterPartitionReassignmentsResponse{t: t}
}

func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*AlterPartitionReassignmentsRequest)
_ = req
res := &AlterPartitionReassignmentsResponse{}
Expand All @@ -721,7 +721,7 @@ func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
return &MockDeleteRecordsResponse{t: t}
}

func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteRecordsRequest)
res := &DeleteRecordsResponse{}
res.Topics = make(map[string]*DeleteRecordsResponseTopic)
Expand All @@ -744,7 +744,7 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse
return &MockDescribeConfigsResponse{t: t}
}

func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DescribeConfigsRequest)
res := &DescribeConfigsResponse{
Version: req.Version,
Expand Down Expand Up @@ -835,7 +835,7 @@ func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
return &MockAlterConfigsResponse{t: t}
}

func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*AlterConfigsRequest)
res := &AlterConfigsResponse{}

Expand All @@ -856,7 +856,7 @@ func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
return &MockCreateAclsResponse{t: t}
}

func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreateAclsRequest)
res := &CreateAclsResponse{}

Expand All @@ -874,7 +874,7 @@ func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
return &MockListAclsResponse{t: t}
}

func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DescribeAclsRequest)
res := &DescribeAclsResponse{}
res.Err = ErrNoError
Expand Down Expand Up @@ -916,7 +916,7 @@ func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateRespon
return &MockSaslAuthenticateResponse{t: t}
}

func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
res := &SaslAuthenticateResponse{}
res.Err = msar.kerror
res.SaslAuthBytes = msar.saslAuthBytes
Expand Down Expand Up @@ -947,7 +947,7 @@ func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
return &MockSaslHandshakeResponse{t: t}
}

func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
res := &SaslHandshakeResponse{}
res.Err = mshr.kerror
res.EnabledMechanisms = mshr.enabledMechanisms
Expand All @@ -968,7 +968,7 @@ func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
return &MockDeleteAclsResponse{t: t}
}

func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteAclsRequest)
res := &DeleteAclsResponse{}

Expand All @@ -994,7 +994,7 @@ func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDelete
return m
}

func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &DeleteGroupsResponse{
GroupErrorCodes: map[string]KError{},
}
Expand Down
Loading

0 comments on commit 023606a

Please sign in to comment.