diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index 5de79cce8..3f4986e2f 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -78,14 +78,25 @@ func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { defer sp.l.Unlock() if len(sp.expectations) >= len(msgs) { - expectations := sp.expectations[0 : len(msgs)-1] + expectations := sp.expectations[0:len(msgs)] sp.expectations = sp.expectations[len(msgs):] - for _, expectation := range expectations { + for i, expectation := range expectations { + if expectation.CheckFunction != nil { + val, err := msgs[i].Value.Encode() + if err != nil { + sp.t.Errorf("Input message encoding failed: %s", err.Error()) + return err + } + errCheck := expectation.CheckFunction(val) + if errCheck != nil { + sp.t.Errorf("Check function returned an error: %s", errCheck.Error()) + return errCheck + } + } if expectation.Result != errProduceSuccess { return expectation.Result } - } return nil } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index 0fdc99877..bf2c71a19 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -1,6 +1,7 @@ package mocks import ( + "errors" "strings" "testing" @@ -122,3 +123,128 @@ func TestSyncProducerWithCheckerFunction(t *testing.T) { t.Error("Expected to report an error") } } + +func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + + msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + msgs := []*sarama.ProducerMessage{msg1, msg2} + + if err := sp.SendMessages(msgs); err == nil || !strings.HasPrefix(err.Error(), "No match") { + t.Error("Error during value check expected on second message, found: ", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 { + t.Error("Expected to report an error") + } +} + +func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + + msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + msgs := []*sarama.ProducerMessage{msg1} + + if err := sp.SendMessages(msgs); err != nil { + t.Error("No error expected on SendMessages call, found: ", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +} + +func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + + msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + + msgs := []*sarama.ProducerMessage{msg1, msg2} + + if err := sp.SendMessages(msgs); err == nil { + t.Error("Error during value check expected on second message, found: ", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 2 { + t.Error("Expected to report 2 errors") + } +} + +func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + + msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + msgs := []*sarama.ProducerMessage{msg1} + + if err := sp.SendMessages(msgs); err != nil { + t.Error("No error expected on SendMessages call, found: ", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 { + t.Error("Expected to report 1 errors") + } +} + +func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + + msg1 := &sarama.ProducerMessage{Topic: "test", Value: faultyEncoder("123")} + msgs := []*sarama.ProducerMessage{msg1} + + if err := sp.SendMessages(msgs); err == nil || !strings.HasPrefix(err.Error(), "encode error") { + t.Error("Encoding error expected, found: ", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 { + t.Error("Expected to report 1 errors") + } +} + +type faultyEncoder []byte + +func (f faultyEncoder) Encode() ([]byte, error) { + return nil, errors.New("encode error") +} + +func (f faultyEncoder) Length() int { + return len(f) +}