From 2840a3795ada42d78326c26fb2884512c02279be Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Thu, 24 Sep 2015 16:22:49 -0400 Subject: [PATCH] Fix handling of unencodable messages Move `Key.Encode()` and `Value.Encode()` calls slightly earlier (to `groupAndFilter`) where we have access to the batch in order to be able to remove them from consideration on error. Otherwise failed messages would not be removed from the batch and could end up returned twice. Add cache members to the ProducerMessage struct to store the results until we actually need them. Fixes #449. This is perhaps not the most elegant solution. However it is correct and a better solution would be a lot more invasive. This will do in order to ship the fix in a 1.5.1 patch release. --- async_producer.go | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/async_producer.go b/async_producer.go index 8e229490f..7e0339b64 100644 --- a/async_producer.go +++ b/async_producer.go @@ -119,6 +119,8 @@ type ProducerMessage struct { retries int flags flagSet + + keyCache, valueCache []byte } func (m *ProducerMessage) byteSize() int { @@ -135,6 +137,8 @@ func (m *ProducerMessage) byteSize() int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.keyCache = nil + m.valueCache = nil } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -660,6 +664,7 @@ func (f *flusher) run() { } func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { + var err error msgSets := make(map[string]map[int32][]*ProducerMessage) for i, msg := range batch { @@ -679,6 +684,22 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] continue } + if msg.Key != nil { + if msg.keyCache, err = msg.Key.Encode(); err != nil { + f.parent.returnError(msg, err) + batch[i] = nil + continue + } + } + + if msg.Value != nil { + if msg.valueCache, err = msg.Value.Encode(); err != nil { + f.parent.returnError(msg, err) + batch[i] = nil + continue + } + } + partitionSet := msgSets[msg.Topic] if partitionSet == nil { partitionSet = make(map[int32][]*ProducerMessage) @@ -786,21 +807,6 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa setToSend := new(MessageSet) setSize := 0 for _, msg := range msgSet { - var keyBytes, valBytes []byte - var err error - if msg.Key != nil { - if keyBytes, err = msg.Key.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if msg.Value != nil { - if valBytes, err = msg.Value.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { // compression causes message-sets to be wrapped as single messages, which have tighter // size requirements, so we have to respect those limits @@ -815,7 +821,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa } setSize += msg.byteSize() - setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) + setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache}) empty = false }