Skip to content

Commit

Permalink
Merge pull request #687 from jacksilv/checkMessages
Browse files Browse the repository at this point in the history
Check the received value in producers (synch/asych) mocks agains a regexp
  • Loading branch information
eapache authored Jul 6, 2016
2 parents 15fe77a + b070127 commit 12de47f
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 17 deletions.
48 changes: 40 additions & 8 deletions mocks/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

// AsyncProducer implements sarama's Producer interface for testing purposes.
// Before you can send messages to it's Input channel, you have to set expectations
// so it knows how to handle the input. This way you can easily test success and
// failure scenarios.
// so it knows how to handle the input; it returns an error if the number of messages
// received is bigger then the number of expectations set. You can also set a
// function in each expectation so that the message value is checked by this function
// and an error is returned if the match fails.
type AsyncProducer struct {
l sync.Mutex
t ErrorReporter
Expand Down Expand Up @@ -52,6 +54,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
} else {
expectation := mp.expectations[0]
mp.expectations = mp.expectations[1:]
if expectation.CheckFunction != nil {
if val, err := msg.Value.Encode(); err != nil {
mp.t.Errorf("Input message encoding failed: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
} else {
err = expectation.CheckFunction(val)
if err != nil {
mp.t.Errorf("Check function returned an error: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
}
}
}
if expectation.Result == errProduceSuccess {
mp.lastOffset++
if config.Producer.Return.Successes {
Expand Down Expand Up @@ -122,21 +136,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
// Setting expectations
////////////////////////////////////////////////

// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will call the given function to check
// the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}

// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will first call the given function to
// check the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it failed to produce successfully. This means
// it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}

// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it is produced successfully,
// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
// is set to true.
func (mp *AsyncProducer) ExpectInputAndSucceed() {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
mp.ExpectInputWithCheckerFunctionAndSucceed(nil)
}

// ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it failed to produce
// successfully. This means it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputAndFail(err error) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
mp.ExpectInputWithCheckerFunctionAndFail(nil, err)
}
23 changes: 23 additions & 0 deletions mocks/async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"fmt"
"strings"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -92,3 +93,25 @@ func TestProducerWithTooManyExpectations(t *testing.T) {
t.Error("Expected to report an error")
}
}

func TestProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()
mp := NewAsyncProducer(trm, nil)
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))

mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if err := mp.Close(); err != nil {
t.Error(err)
}

if len(mp.Errors()) != 1 {
t.Error("Expected to report an error")
}

err1 := <-mp.Errors()
if !strings.HasPrefix(err1.Err.Error(), "No match") {
t.Error("Expected to report a value check error, found: ", err1.Err)
}
}
23 changes: 22 additions & 1 deletion mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package mocks

import (
"errors"
"fmt"
"regexp"

"github.com/Shopify/sarama"
)
Expand All @@ -25,6 +27,24 @@ type ErrorReporter interface {
Errorf(string, ...interface{})
}

// ValueChecker is a function type to be set in each expectation of the producer mocks
// to check the value passed.
type ValueChecker func(val []byte) error

// This function is used inside the mocks unit tests to generate ValueCheckers
func generateRegexpChecker(re string) func([]byte) error {
return func(val []byte) error {
matched, err := regexp.MatchString(re, string(val))
if err != nil {
return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error())
}
if !matched {
return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re)
}
return nil
}
}

var (
errProduceSuccess error = nil
errOutOfExpectations = errors.New("No more expectations set on mock")
Expand All @@ -34,7 +54,8 @@ var (
const AnyOffset int64 = -1000

type producerExpectation struct {
Result error
Result error
CheckFunction ValueChecker
}

type consumerExpectation struct {
Expand Down
45 changes: 37 additions & 8 deletions mocks/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {

// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessage, so it knows
// how to handle them. If there is no more remaining expectations when SendMessage is called,
// how to handle them. You can set a function in each expectation so that the message value
// checked by this function and an error is returned if the match fails.
// If there is no more remaining expectation when SendMessage is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
sp.l.Lock()
Expand All @@ -43,7 +45,18 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3
if len(sp.expectations) > 0 {
expectation := sp.expectations[0]
sp.expectations = sp.expectations[1:]

if expectation.CheckFunction != nil {
if val, err := msg.Value.Encode(); err != nil {
sp.t.Errorf("Input message encoding failed: %s", err.Error())
return -1, -1, err
} else {
err := expectation.CheckFunction(val)
if err != nil {
sp.t.Errorf("Check function returned an error: %s", err.Error())
return -1, -1, err
}
}
}
if expectation.Result == errProduceSuccess {
sp.lastOffset++
msg.Offset = sp.lastOffset
Expand Down Expand Up @@ -100,20 +113,36 @@ func (sp *SyncProducer) Close() error {
// Setting expectations
////////////////////////////////////////////////

// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
// will be called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it produced
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}

// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it failed
// to produce successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}

// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it produced successfully, i.e. by
// returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
}

// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it failed to produce
// successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
}
26 changes: 26 additions & 0 deletions mocks/sync_producer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mocks

import (
"strings"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -96,3 +97,28 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
t.Error("Expected to report an error")
}
}

func TestSyncProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()

sp := NewSyncProducer(trm, nil)
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))

msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err != nil {
t.Error("No error expected on first SendMessage call, found: ", err)
}
msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") {
t.Error("Error during value check expected on second SendMessage call, found:", err)
}

if err := sp.Close(); err != nil {
t.Error(err)
}

if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}

0 comments on commit 12de47f

Please sign in to comment.