Skip to content

Commit

Permalink
Update MockBroker documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Feb 15, 2016
1 parent 201ab73 commit ffff88f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
43 changes: 34 additions & 9 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,25 @@ const (

type requestHandlerFunc func(req *request) (res encoder)

// mockBroker is a mock Kafka broker. It consists of a TCP server on a
// kernel-selected localhost port that can accept many connections. It reads
// Kafka requests from that connection and passes them to the user specified
// handler function (see SetHandler) that generates respective responses. If
// the handler has not been explicitly specified then the broker returns
// responses set by the Returns function in the exact order they were provided.
// (if a response has a len of 0, nothing is sent, and the client request will
// timeout in this case).
// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
// to facilitate testing of higher level or specialized consumers and producers
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
// but rather provides a facility to do that. It takes care of the TCP
// transport, request unmarshaling, response marshaling, and makes it the test
// writer responsibility to program correct according to the Kafka API protocol
// MockBroker behaviour.
//
// When running tests with one of these, it is strongly recommended to specify
// MockBroker is implemented as a TCP server listening on a kernel-selected
// localhost port that can accept many connections. It reads Kafka requests
// from that connection and returns responses programmed by the SetHandlerByMap
// function. If a MockBroker receives a request that it has no programmed
// response for, then it returns nothing and the request times out.
//
// A set of MockRequest builders to define mappings used by MockBroker is
// provided by Sarama. But users can develop MockRequests of their own and use
// them along with or instead of the standard ones.
//
// When running tests with MockBroker it is strongly recommended to specify
// a timeout to `go test` so that if the broker hangs waiting for a response,
// the test panics.
//
Expand All @@ -50,15 +59,22 @@ type MockBroker struct {
lock sync.Mutex
}

// RequestResponse represents a Request/Response pair processed by MockBroker.
type RequestResponse struct {
Request requestBody
Response encoder
}

// SetLatency makes broker pause for the specified period every time before
// replying.
func (b *MockBroker) SetLatency(latency time.Duration) {
b.latency = latency
}

// SetHandlerByMap defines mapping of Request types to MockResponses. When a
// request is received by the broker, it looks up the request type in the map
// and uses the found MockResponse instance to generate an appropriate reply.
// If the request type is not found in the map then nothing is sent.
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
b.setHandler(func(req *request) (res encoder) {
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
Expand All @@ -70,10 +86,15 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
})
}

// BrokerID returns broker ID assigned to the broker.
func (b *MockBroker) BrokerID() int32 {
return b.brokerID
}

// History returns a slice of RequestResponse pairs in the order they were
// processed by the broker. Note that in case of multiple connections to the
// broker the order expected by a test can be different from the order recorded
// in the history, unless some synchronization is implemented in the test.
func (b *MockBroker) History() []RequestResponse {
b.lock.Lock()
history := make([]RequestResponse, len(b.history))
Expand All @@ -82,14 +103,18 @@ func (b *MockBroker) History() []RequestResponse {
return history
}

// Port returns the TCP port number the broker is listening for requests on.
func (b *MockBroker) Port() int32 {
return b.port
}

// Addr returns the broker connection string in the form "<address>:<port>".
func (b *MockBroker) Addr() string {
return b.listener.Addr().String()
}

// Close terminates the broker blocking until it stops internal goroutines and
// releases all resources.
func (b *MockBroker) Close() {
close(b.expectations)
if len(b.expectations) > 0 {
Expand Down
3 changes: 2 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

// MockResponse is a response builder interface it defines one method that
// allows generating a response based on a request body.
// allows generating a response based on a request body. MockResponses are used
// to program behavior of MockBroker in tests.
type MockResponse interface {
For(reqBody decoder) (res encoder)
}
Expand Down

0 comments on commit ffff88f

Please sign in to comment.