Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check the received value in producers (synch/asych) mocks agains a regexp #687

Merged
merged 3 commits into from
Jul 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions mocks/async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

// AsyncProducer implements sarama's Producer interface for testing purposes.
// Before you can send messages to it's Input channel, you have to set expectations
// so it knows how to handle the input. This way you can easily test success and
// failure scenarios.
// so it knows how to handle the input; it returns an error if the number of messages
// received is bigger then the number of expectations set. You can also set a
// function in each expectation so that the message value is checked by this function
// and an error is returned if the match fails.
type AsyncProducer struct {
l sync.Mutex
t ErrorReporter
Expand Down Expand Up @@ -52,6 +54,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
} else {
expectation := mp.expectations[0]
mp.expectations = mp.expectations[1:]
if expectation.CheckFunction != nil {
if val, err := msg.Value.Encode(); err != nil {
mp.t.Errorf("Input message encoding failed: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
} else {
err = expectation.CheckFunction(val)
if err != nil {
mp.t.Errorf("Check function returned an error: %s", err.Error())
mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
}
}
}
if expectation.Result == errProduceSuccess {
mp.lastOffset++
if config.Producer.Return.Successes {
Expand Down Expand Up @@ -122,21 +136,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
// Setting expectations
////////////////////////////////////////////////

// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will call the given function to check
// the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}

// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
// will be provided on the input channel. The mock producer will first call the given function to
// check the message value. If an error is returned it will be made available on the Errors channel
// otherwise the mock will handle the message as if it failed to produce successfully. This means
// it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}

// ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it is produced successfully,
// i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
// is set to true.
func (mp *AsyncProducer) ExpectInputAndSucceed() {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
mp.ExpectInputWithCheckerFunctionAndSucceed(nil)
}

// ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
// on the input channel. The mock producer will handle the message as if it failed to produce
// successfully. This means it will make a ProducerError available on the Errors channel.
func (mp *AsyncProducer) ExpectInputAndFail(err error) {
mp.l.Lock()
defer mp.l.Unlock()
mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
mp.ExpectInputWithCheckerFunctionAndFail(nil, err)
}
23 changes: 23 additions & 0 deletions mocks/async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mocks

import (
"fmt"
"strings"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -92,3 +93,25 @@ func TestProducerWithTooManyExpectations(t *testing.T) {
t.Error("Expected to report an error")
}
}

func TestProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()
mp := NewAsyncProducer(trm, nil)
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))

mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if err := mp.Close(); err != nil {
t.Error(err)
}

if len(mp.Errors()) != 1 {
t.Error("Expected to report an error")
}

err1 := <-mp.Errors()
if !strings.HasPrefix(err1.Err.Error(), "No match") {
t.Error("Expected to report a value check error, found: ", err1.Err)
}
}
23 changes: 22 additions & 1 deletion mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ package mocks

import (
"errors"
"fmt"
"regexp"

"github.com/Shopify/sarama"
)
Expand All @@ -25,6 +27,24 @@ type ErrorReporter interface {
Errorf(string, ...interface{})
}

// ValueChecker is a function type to be set in each expectation of the producer mocks
// to check the value passed.
type ValueChecker func(val []byte) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use a (brief) godoc comment


// This function is used inside the mocks unit tests to generate ValueCheckers
func generateRegexpChecker(re string) func([]byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not exported, so if you want to actually use it in your app you won't be able to... I think it might just be easier to implement this directly in your app instead of here anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this there on purpuse just for the unit tests, thanks.

return func(val []byte) error {
matched, err := regexp.MatchString(re, string(val))
if err != nil {
return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error())
}
if !matched {
return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re)
}
return nil
}
}

var (
errProduceSuccess error = nil
errOutOfExpectations = errors.New("No more expectations set on mock")
Expand All @@ -34,7 +54,8 @@ var (
const AnyOffset int64 = -1000

type producerExpectation struct {
Result error
Result error
CheckFunction ValueChecker
}

type consumerExpectation struct {
Expand Down
45 changes: 37 additions & 8 deletions mocks/sync_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {

// SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
// You have to set expectations on the mock producer before calling SendMessage, so it knows
// how to handle them. If there is no more remaining expectations when SendMessage is called,
// how to handle them. You can set a function in each expectation so that the message value
// checked by this function and an error is returned if the match fails.
// If there is no more remaining expectation when SendMessage is called,
// the mock producer will write an error to the test state object.
func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
sp.l.Lock()
Expand All @@ -43,7 +45,18 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3
if len(sp.expectations) > 0 {
expectation := sp.expectations[0]
sp.expectations = sp.expectations[1:]

if expectation.CheckFunction != nil {
if val, err := msg.Value.Encode(); err != nil {
sp.t.Errorf("Input message encoding failed: %s", err.Error())
return -1, -1, err
} else {
err := expectation.CheckFunction(val)
if err != nil {
sp.t.Errorf("Check function returned an error: %s", err.Error())
return -1, -1, err
}
}
}
if expectation.Result == errProduceSuccess {
sp.lastOffset++
msg.Offset = sp.lastOffset
Expand Down Expand Up @@ -75,20 +88,36 @@ func (sp *SyncProducer) Close() error {
// Setting expectations
////////////////////////////////////////////////

// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
// will be called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it produced
// successfully, i.e. by returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
}

// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will first call the given function to check the message value.
// It will cascade the error of the function, if any, or handle the message as if it failed
// to produce successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
}

// ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it produced successfully, i.e. by
// returning a valid partition, and offset, and a nil error.
func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
}

// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
// called. The mock producer will handle the message as if it failed to produce
// successfully, i.e. by returning the provided error.
func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
sp.l.Lock()
defer sp.l.Unlock()
sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
}
26 changes: 26 additions & 0 deletions mocks/sync_producer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mocks

import (
"strings"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -96,3 +97,28 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
t.Error("Expected to report an error")
}
}

func TestSyncProducerWithCheckerFunction(t *testing.T) {
trm := newTestReporterMock()

sp := NewSyncProducer(trm, nil)
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))

msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err != nil {
t.Error("No error expected on first SendMessage call, found: ", err)
}
msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") {
t.Error("Error during value check expected on second SendMessage call, found:", err)
}

if err := sp.Close(); err != nil {
t.Error(err)
}

if len(trm.errors) != 1 {
t.Error("Expected to report an error")
}
}