From ab79623e66e27b85d21730adb25cc548f980acc8 Mon Sep 17 00:00:00 2001 From: Joe Wreschnig Date: Tue, 1 Jun 2021 18:35:37 +0200 Subject: [PATCH] Allow checking the entire `ProducerMessage` in the mock producers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is done by introducing `MessageChecker`, a function which validates an entire `ProducerMessage`, and reimplementing the existing expectation interface on top of that. Since `producerExpectation` was not a public API, the only visible effect of this to users of the mock producers should be slightly different error messages when encoding fails. To better mimic the real producers’ behavior, this also runs partitioning before passing the message to the checker. By default all topics have 32 partitions but this can be configured per-topic in either producer via the embedded `TopicConfig`. (This does not allow mocking unavailable partitions; I don’t see a use case for this which can’t be better handled by unit testing the partitioner.) This may affect existing tests if someone was passing a configuration with a `Partitioner` that would fail on their test messages, but it seems more likely they want to know if this is the case (we did). This is useful for (at least) two reasons, - I would often like to test more than just the message value; services may need to produce to different topics or with specific partitioning schemes. In some cases valuable data may only be in the message key, or I want to test that a header and message body agree. - Since the topic usually also corresponds to the data format, exposing the topic makes it easier to collect messages of different types across different topics which may appear in known quantities but non-deterministic orders. Without the topic this is only possible by trying to content-sniff the format from the value, which leads to weaker and more fragile tests. --- mocks/async_producer.go | 84 +++++++++++++++++++++++----------- mocks/async_producer_test.go | 42 +++++++++++++++++ mocks/mocks.go | 58 +++++++++++++++++++++++- mocks/sync_producer.go | 87 +++++++++++++++++++++++++----------- mocks/sync_producer_test.go | 2 +- 5 files changed, 219 insertions(+), 54 deletions(-) diff --git a/mocks/async_producer.go b/mocks/async_producer.go index cbd911c63..d7a4860e4 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -10,8 +10,8 @@ import ( // Before you can send messages to it's Input channel, you have to set expectations // so it knows how to handle the input; it returns an error if the number of messages // received is bigger then the number of expectations set. You can also set a -// function in each expectation so that the message value is checked by this function -// and an error is returned if the match fails. +// function in each expectation so that the message is checked by this function and +// an error is returned if the match fails. type AsyncProducer struct { l sync.Mutex t ErrorReporter @@ -21,12 +21,13 @@ type AsyncProducer struct { successes chan *sarama.ProducerMessage errors chan *sarama.ProducerError lastOffset int64 + *TopicConfig } // NewAsyncProducer instantiates a new Producer mock. The t argument should // be the *testing.T instance of your test method. An error will be written to it if // an expectation is violated. The config argument is used to determine whether it -// should ack successes on the Successes channel. +// should ack successes on the Successes channel and to handle partitioning. func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { if config == nil { config = sarama.NewConfig() @@ -38,6 +39,7 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { input: make(chan *sarama.ProducerMessage, config.ChannelBufferSize), successes: make(chan *sarama.ProducerMessage, config.ChannelBufferSize), errors: make(chan *sarama.ProducerError, config.ChannelBufferSize), + TopicConfig: NewTopicConfig(), } go func() { @@ -47,7 +49,14 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { close(mp.closed) }() + partitioners := make(map[string]sarama.Partitioner, 1) + for msg := range mp.input { + partitioner := partitioners[msg.Topic] + if partitioner == nil { + partitioner = config.Producer.Partitioner(msg.Topic) + partitioners[msg.Topic] = partitioner + } mp.l.Lock() if mp.expectations == nil || len(mp.expectations) == 0 { mp.expectations = nil @@ -55,27 +64,30 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { } else { expectation := mp.expectations[0] mp.expectations = mp.expectations[1:] - if expectation.CheckFunction != nil { - if val, err := msg.Value.Encode(); err != nil { - mp.t.Errorf("Input message encoding failed: %s", err.Error()) - mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} - } else { - err = expectation.CheckFunction(val) + + partition, err := partitioner.Partition(msg, mp.partitions(msg.Topic)) + if err != nil { + mp.t.Errorf("Partitioner returned an error: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } else { + msg.Partition = partition + if expectation.CheckFunction != nil { + err := expectation.CheckFunction(msg) if err != nil { mp.t.Errorf("Check function returned an error: %s", err.Error()) mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} } } - } - if expectation.Result == errProduceSuccess { - mp.lastOffset++ - if config.Producer.Return.Successes { - msg.Offset = mp.lastOffset - mp.successes <- msg - } - } else { - if config.Producer.Return.Errors { - mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg} + if expectation.Result == errProduceSuccess { + mp.lastOffset++ + if config.Producer.Return.Successes { + msg.Offset = mp.lastOffset + mp.successes <- msg + } + } else { + if config.Producer.Return.Errors { + mp.errors <- &sarama.ProducerError{Err: expectation.Result, Msg: msg} + } } } } @@ -135,15 +147,35 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // Setting expectations //////////////////////////////////////////////// +// ExpectInputWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer that a +// message will be provided on the input channel. The mock producer will call the given function to +// check the message. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it produced successfully, i.e. it will make it +// available on the Successes channel if the Producer.Return.Successes setting is set to true. +func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndSucceed(cf MessageChecker) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectInputWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that a +// message will be provided on the input channel. The mock producer will first call the given +// function to check the message. If an error is returned it will be made available on the Errors +// channel otherwise the mock will handle the message as if it failed to produce successfully. This +// means it will make a ProducerError available on the Errors channel. +func (mp *AsyncProducer) ExpectInputWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message // will be provided on the input channel. The mock producer will call the given function to check // the message value. If an error is returned it will be made available on the Errors channel // otherwise the mock will handle the message as if it produced successfully, i.e. it will make // it available on the Successes channel if the Producer.Return.Successes setting is set to true. func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) + mp.ExpectInputWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf)) } // ExpectInputWithCheckerFunctionAndFail sets an expectation on the mock producer that a message @@ -152,9 +184,7 @@ func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecke // otherwise the mock will handle the message as if it failed to produce successfully. This means // it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) + mp.ExpectInputWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err) } // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided @@ -162,12 +192,12 @@ func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. func (mp *AsyncProducer) ExpectInputAndSucceed() { - mp.ExpectInputWithCheckerFunctionAndSucceed(nil) + mp.ExpectInputWithMessageCheckerFunctionAndSucceed(nil) } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputAndFail(err error) { - mp.ExpectInputWithCheckerFunctionAndFail(nil, err) + mp.ExpectInputWithMessageCheckerFunctionAndFail(nil, err) } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index fc354232c..280823a2b 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -130,3 +130,45 @@ func TestProducerWithCheckerFunction(t *testing.T) { t.Error("Expected to report a value check error, found: ", err1.Err) } } + +func TestProducerWithBrokenPartitioner(t *testing.T) { + trm := newTestReporterMock() + config := sarama.NewConfig() + config.Producer.Partitioner = func(string) sarama.Partitioner { + return brokePartitioner{} + } + mp := NewAsyncProducer(trm, config) + mp.ExpectInputWithMessageCheckerFunctionAndSucceed(func(msg *sarama.ProducerMessage) error { + if msg.Partition != 15 { + t.Error("Expected partition 15, found: ", msg.Partition) + } + if msg.Topic != "test" { + t.Errorf(`Expected topic "test", found: %q`, msg.Topic) + } + return nil + }) + mp.ExpectInputAndSucceed() // should actually fail in partitioning + + mp.Input() <- &sarama.ProducerMessage{Topic: "test"} + mp.Input() <- &sarama.ProducerMessage{Topic: "not-test"} + if err := mp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 || !strings.Contains(trm.errors[0], "partitioning unavailable") { + t.Error("Expected to report partitioning unavailable, found", trm.errors) + } +} + +// brokeProducer refuses to partition anything not on the “test” topic, and sends everything on +// that topic to partition 15. +type brokePartitioner struct{} + +func (brokePartitioner) Partition(msg *sarama.ProducerMessage, n int32) (int32, error) { + if msg.Topic == "test" { + return 15, nil + } + return 0, errors.New("partitioning unavailable") +} + +func (brokePartitioner) RequiresConsistency() bool { return false } diff --git a/mocks/mocks.go b/mocks/mocks.go index 5b0e4329e..25671bc28 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,7 @@ package mocks import ( "errors" + "fmt" "github.com/Shopify/sarama" ) @@ -29,6 +30,26 @@ type ErrorReporter interface { // to check the value passed. type ValueChecker func(val []byte) error +// MessageChecker is a function type to be set in each expectation of the producer mocks +// to check the message passed. +type MessageChecker func(*sarama.ProducerMessage) error + +// messageValueChecker wraps a ValueChecker into a MessageChecker. +// Failure to encode the message value will return an error and not call +// the wrapped ValueChecker. +func messageValueChecker(f ValueChecker) MessageChecker { + if f == nil { + return nil + } + return func(msg *sarama.ProducerMessage) error { + val, err := msg.Value.Encode() + if err != nil { + return fmt.Errorf("Input message encoding failed: %s", err.Error()) + } + return f(val) + } +} + var ( errProduceSuccess error = nil errOutOfExpectations = errors.New("No more expectations set on mock") @@ -39,7 +60,42 @@ const AnyOffset int64 = -1000 type producerExpectation struct { Result error - CheckFunction ValueChecker + CheckFunction MessageChecker +} + +// TopicConfig describes a mock topic structure for the mock producers’ partitioning needs. +type TopicConfig struct { + overridePartitions map[string]int32 + defaultPartitions int32 +} + +// NewTopicConfig makes a configuration which defaults to 32 partitions for every topic. +func NewTopicConfig() *TopicConfig { + return &TopicConfig{ + overridePartitions: make(map[string]int32, 0), + defaultPartitions: 32, + } +} + +// SetDefaultPartitions sets the number of partitions any topic not explicitly configured otherwise +// (by SetPartitions) will have from the perspective of created partitioners. +func (pc *TopicConfig) SetDefaultPartitions(n int32) { + pc.defaultPartitions = n +} + +// SetPartitions sets the number of partitions the partitioners will see for specific topics. This +// only applies to messages produced after setting them. +func (pc *TopicConfig) SetPartitions(partitions map[string]int32) { + for p, n := range partitions { + pc.overridePartitions[p] = n + } +} + +func (pc *TopicConfig) partitions(topic string) int32 { + if n, found := pc.overridePartitions[topic]; found { + return n + } + return pc.defaultPartitions } // NewTestConfig returns a config meant to be used by tests. diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index 8a089e0dc..1220941c6 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -15,16 +15,25 @@ type SyncProducer struct { t ErrorReporter expectations []*producerExpectation lastOffset int64 + + *TopicConfig + newPartitioner sarama.PartitionerConstructor + partitioners map[string]sarama.Partitioner } // NewSyncProducer instantiates a new SyncProducer mock. The t argument should // be the *testing.T instance of your test method. An error will be written to it if -// an expectation is violated. The config argument is currently unused, but is -// maintained to be compatible with the async Producer. +// an expectation is violated. The config argument is used to handle partitioning. func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer { + if config == nil { + config = sarama.NewConfig() + } return &SyncProducer{ - t: t, - expectations: make([]*producerExpectation, 0), + t: t, + expectations: make([]*producerExpectation, 0), + TopicConfig: NewTopicConfig(), + newPartitioner: config.Producer.Partitioner, + partitioners: make(map[string]sarama.Partitioner, 1), } } @@ -45,14 +54,15 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 if len(sp.expectations) > 0 { expectation := sp.expectations[0] sp.expectations = sp.expectations[1:] + topic := msg.Topic + partition, err := sp.partitioner(topic).Partition(msg, sp.partitions(topic)) + if err != nil { + sp.t.Errorf("Partitioner returned an error: %s", err.Error()) + return -1, -1, err + } + msg.Partition = partition if expectation.CheckFunction != nil { - val, err := msg.Value.Encode() - if err != nil { - sp.t.Errorf("Input message encoding failed: %s", err.Error()) - return -1, -1, err - } - - errCheck := expectation.CheckFunction(val) + errCheck := expectation.CheckFunction(msg) if errCheck != nil { sp.t.Errorf("Check function returned an error: %s", errCheck.Error()) return -1, -1, errCheck @@ -82,13 +92,15 @@ func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { sp.expectations = sp.expectations[len(msgs):] for i, expectation := range expectations { + topic := msgs[i].Topic + partition, err := sp.partitioner(topic).Partition(msgs[i], sp.partitions(topic)) + if err != nil { + sp.t.Errorf("Partitioner returned an error: %s", err.Error()) + return err + } + msgs[i].Partition = partition if expectation.CheckFunction != nil { - val, err := msgs[i].Value.Encode() - if err != nil { - sp.t.Errorf("Input message encoding failed: %s", err.Error()) - return err - } - errCheck := expectation.CheckFunction(val) + errCheck := expectation.CheckFunction(msgs[i]) if errCheck != nil { sp.t.Errorf("Check function returned an error: %s", errCheck.Error()) return errCheck @@ -106,6 +118,15 @@ func (sp *SyncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { return errOutOfExpectations } +func (sp *SyncProducer) partitioner(topic string) sarama.Partitioner { + partitioner := sp.partitioners[topic] + if partitioner == nil { + partitioner = sp.newPartitioner(topic) + sp.partitioners[topic] = partitioner + } + return partitioner +} + // Close corresponds with the Close method of sarama's SyncProducer implementation. // By closing a mock syncproducer, you also tell it that no more SendMessage calls will follow, // so it will write an error to the test state if there's any remaining expectations. @@ -124,14 +145,32 @@ func (sp *SyncProducer) Close() error { // Setting expectations //////////////////////////////////////////////// +// ExpectSendMessageWithMessageCheckerFunctionAndSucceed sets an expectation on the mock producer +// that SendMessage will be called. The mock producer will first call the given function to check +// the message. It will cascade the error of the function, if any, or handle the message as if it +// produced successfully, i.e. by returning a valid partition, and offset, and a nil error. +func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndSucceed(cf MessageChecker) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectSendMessageWithMessageCheckerFunctionAndFail sets an expectation on the mock producer that +// SendMessage will be called. The mock producer will first call the given function to check the +// message. It will cascade the error of the function, if any, or handle the message as if it +// failed to produce successfully, i.e. by returning the provided error. +func (sp *SyncProducer) ExpectSendMessageWithMessageCheckerFunctionAndFail(cf MessageChecker, err error) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage // will be called. The mock producer will first call the given function to check the message value. // It will cascade the error of the function, if any, or handle the message as if it produced // successfully, i.e. by returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) + sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(messageValueChecker(cf)) } // ExpectSendMessageWithCheckerFunctionAndFail sets an expectation on the mock producer that SendMessage will be @@ -139,21 +178,19 @@ func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueC // It will cascade the error of the function, if any, or handle the message as if it failed // to produce successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) + sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(messageValueChecker(cf), err) } // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageAndSucceed() { - sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil) + sp.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(nil) } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { - sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err) + sp.ExpectSendMessageWithMessageCheckerFunctionAndFail(nil, err) } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index 0e47ddd85..8c4582ba9 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -233,7 +233,7 @@ func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) { msg1 := &sarama.ProducerMessage{Topic: "test", Value: faultyEncoder("123")} msgs := []*sarama.ProducerMessage{msg1} - if err := sp.SendMessages(msgs); err == nil || !strings.HasPrefix(err.Error(), "encode error") { + if err := sp.SendMessages(msgs); err == nil || !strings.Contains(err.Error(), "encode error") { t.Error("Encoding error expected, found: ", err) }