diff --git a/async_producer.go b/async_producer.go index 8e229490f..7e0339b64 100644 --- a/async_producer.go +++ b/async_producer.go @@ -119,6 +119,8 @@ type ProducerMessage struct { retries int flags flagSet + + keyCache, valueCache []byte } func (m *ProducerMessage) byteSize() int { @@ -135,6 +137,8 @@ func (m *ProducerMessage) byteSize() int { func (m *ProducerMessage) clear() { m.flags = 0 m.retries = 0 + m.keyCache = nil + m.valueCache = nil } // ProducerError is the type of error generated when the producer fails to deliver a message. @@ -660,6 +664,7 @@ func (f *flusher) run() { } func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage { + var err error msgSets := make(map[string]map[int32][]*ProducerMessage) for i, msg := range batch { @@ -679,6 +684,22 @@ func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32] continue } + if msg.Key != nil { + if msg.keyCache, err = msg.Key.Encode(); err != nil { + f.parent.returnError(msg, err) + batch[i] = nil + continue + } + } + + if msg.Value != nil { + if msg.valueCache, err = msg.Value.Encode(); err != nil { + f.parent.returnError(msg, err) + batch[i] = nil + continue + } + } + partitionSet := msgSets[msg.Topic] if partitionSet == nil { partitionSet = make(map[int32][]*ProducerMessage) @@ -786,21 +807,6 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa setToSend := new(MessageSet) setSize := 0 for _, msg := range msgSet { - var keyBytes, valBytes []byte - var err error - if msg.Key != nil { - if keyBytes, err = msg.Key.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if msg.Value != nil { - if valBytes, err = msg.Value.Encode(); err != nil { - p.returnError(msg, err) - continue - } - } - if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes { // compression causes message-sets to be wrapped as single messages, which have tighter // size requirements, so we have to respect those limits @@ -815,7 +821,7 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa } setSize += msg.byteSize() - setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes}) + setToSend.addMessage(&Message{Codec: CompressionNone, Key: msg.keyCache, Value: msg.valueCache}) empty = false }