Skip to content

Commit

Permalink
Shuffle messages less in the producer.
Browse files Browse the repository at this point in the history
Put them in a map right up front in the aggregator, it only requires tracking
one exta piece of metadata (total messages in the map) and it means we don't
have to shuffle them into this form before constructing the request anyways.

One piece of #433.
  • Loading branch information
eapache committed Aug 13, 2015
1 parent ad7f1f7 commit 7d70e73
Showing 1 changed file with 116 additions and 104 deletions.
220 changes: 116 additions & 104 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,13 +490,14 @@ func (pp *partitionProducer) updateLeader() error {
// one per broker, constructs both an aggregator and a flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
input := make(chan *ProducerMessage)
bridge := make(chan []*ProducerMessage)
bridge := make(chan messageBuffer)

a := &aggregator{
parent: p,
broker: broker,
input: input,
output: bridge,
buffer: make(messageBuffer),
}
go withRecover(a.run)

Expand All @@ -511,21 +512,31 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
return input
}

type messageBuffer map[string]map[int32][]*ProducerMessage

func (mb messageBuffer) each(action func([]*ProducerMessage, error), err error) {
for _, partitions := range mb {
for _, messages := range partitions {
action(messages, err)
}
}
}

// groups messages together into appropriately-sized batches for sending to the broker
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
type aggregator struct {
parent *asyncProducer
broker *Broker
input <-chan *ProducerMessage
output chan<- []*ProducerMessage
output chan<- messageBuffer

buffer []*ProducerMessage
bufferBytes int
timer <-chan time.Time
buffer messageBuffer
bufferCount, bufferBytes int
timer <-chan time.Time
}

func (a *aggregator) run() {
var output chan<- []*ProducerMessage
var output chan<- messageBuffer

for {
select {
Expand All @@ -541,7 +552,11 @@ func (a *aggregator) run() {
output = nil
}

a.buffer = append(a.buffer, msg)
if a.buffer[msg.Topic] == nil {
a.buffer[msg.Topic] = make(map[int32][]*ProducerMessage)
}
a.buffer[msg.Topic][msg.Partition] = append(a.buffer[msg.Topic][msg.Partition], msg)
a.bufferCount += 1
a.bufferBytes += msg.byteSize()

if a.readyToFlush(msg) {
Expand Down Expand Up @@ -573,7 +588,7 @@ func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
case a.parent.conf.Producer.Flush.MaxMessages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.MaxMessages:
return true
default:
return false
Expand All @@ -589,7 +604,7 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
case msg.flags&chaser == chaser:
return true
// If we've passed the message trigger-point
case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
case a.parent.conf.Producer.Flush.Messages > 0 && a.bufferCount >= a.parent.conf.Producer.Flush.Messages:
return true
// If we've passed the byte trigger-point
case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
Expand All @@ -601,15 +616,16 @@ func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {

func (a *aggregator) reset() {
a.timer = nil
a.buffer = nil
a.buffer = make(messageBuffer)
a.bufferBytes = 0
a.bufferCount = 0
}

// takes a batch at a time from the aggregator and sends to the broker
type flusher struct {
parent *asyncProducer
broker *Broker
input <-chan []*ProducerMessage
input <-chan messageBuffer

currentRetries map[string]map[int32]error
}
Expand All @@ -621,12 +637,12 @@ func (f *flusher) run() {

for batch := range f.input {
if closing != nil {
f.parent.retryMessages(batch, closing)
batch.each(f.parent.retryMessages, closing)
continue
}

msgSets := f.groupAndFilter(batch)
request := f.parent.buildRequest(msgSets)
f.filter(batch)
request := f.buildRequest(batch)
if request == nil {
continue
}
Expand All @@ -637,58 +653,116 @@ func (f *flusher) run() {
case nil:
break
case PacketEncodingError:
f.parent.returnErrors(batch, err)
batch.each(f.parent.returnErrors, err)
continue
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
f.parent.abandonBrokerConnection(f.broker)
_ = f.broker.Close()
closing = err
f.parent.retryMessages(batch, err)
batch.each(f.parent.retryMessages, err)
continue
}

if response == nil {
// this only happens when RequiredAcks is NoResponse, so we have to assume success
f.parent.returnSuccesses(batch)
batch.each(f.parent.returnSuccesses, nil)
continue
}

f.parseResponse(msgSets, response)
f.parseResponse(batch, response)
}
Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
}

func (f *flusher) groupAndFilter(batch []*ProducerMessage) map[string]map[int32][]*ProducerMessage {
msgSets := make(map[string]map[int32][]*ProducerMessage)
func (f *flusher) filter(batch messageBuffer) {
for topic, partitions := range batch {
for partition, messages := range partitions {
for i, msg := range messages {
if f.currentRetries[topic] != nil && f.currentRetries[topic][partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[topic][partition])
messages[i] = nil

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), topic, partition)
delete(f.currentRetries[topic], partition)
}
}
}
}
}
}

for i, msg := range batch {
func (f *flusher) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {

if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
// we're currently retrying this partition so we need to filter out this message
f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
batch[i] = nil
req := &ProduceRequest{
RequiredAcks: f.parent.conf.Producer.RequiredAcks,
Timeout: int32(f.parent.conf.Producer.Timeout / time.Millisecond),
}
empty := true

if msg.flags&chaser == chaser {
// ...but now we can start processing future messages again
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
f.broker.ID(), msg.Topic, msg.Partition)
delete(f.currentRetries[msg.Topic], msg.Partition)
}
for topic, partitionSet := range batch {
for partition, msgSet := range partitionSet {
setToSend := new(MessageSet)
setSize := 0
for _, msg := range msgSet {
if msg == nil {
continue
}

continue
}
var keyBytes, valBytes []byte
var err error
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
f.parent.returnError(msg, err)
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
f.parent.returnError(msg, err)
continue
}
}

partitionSet := msgSets[msg.Topic]
if partitionSet == nil {
partitionSet = make(map[int32][]*ProducerMessage)
msgSets[msg.Topic] = partitionSet
}
if f.parent.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > f.parent.conf.Producer.MaxMessageBytes {
// compression causes message-sets to be wrapped as single messages, which have tighter
// size requirements, so we have to respect those limits
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes})
setToSend = new(MessageSet)
setSize = 0
}
setSize += msg.byteSize()

partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
empty = false
}

if f.parent.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, setToSend)
} else {
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: f.parent.conf.Producer.Compression, Key: nil, Value: valBytes})
}
}
}

return msgSets
if empty {
return nil
}
return req
}

func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage, response *ProduceResponse) {
Expand All @@ -708,7 +782,7 @@ func (f *flusher) parseResponse(msgSets map[string]map[int32][]*ProducerMessage,
for i := range msgs {
msgs[i].Offset = block.Offset + int64(i)
}
f.parent.returnSuccesses(msgs)
f.parent.returnSuccesses(msgs, nil)
// Retriable errors
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Expand Down Expand Up @@ -776,68 +850,6 @@ func (p *asyncProducer) shutdown() {
close(p.successes)
}

func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {

req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
empty := true

for topic, partitionSet := range batch {
for partition, msgSet := range partitionSet {
setToSend := new(MessageSet)
setSize := 0
for _, msg := range msgSet {
var keyBytes, valBytes []byte
var err error
if msg.Key != nil {
if keyBytes, err = msg.Key.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}
if msg.Value != nil {
if valBytes, err = msg.Value.Encode(); err != nil {
p.returnError(msg, err)
continue
}
}

if p.conf.Producer.Compression != CompressionNone && setSize+msg.byteSize() > p.conf.Producer.MaxMessageBytes {
// compression causes message-sets to be wrapped as single messages, which have tighter
// size requirements, so we have to respect those limits
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
setToSend = new(MessageSet)
setSize = 0
}
setSize += msg.byteSize()

setToSend.addMessage(&Message{Codec: CompressionNone, Key: keyBytes, Value: valBytes})
empty = false
}

if p.conf.Producer.Compression == CompressionNone {
req.AddSet(topic, partition, setToSend)
} else {
valBytes, err := encode(setToSend)
if err != nil {
Logger.Println(err) // if this happens, it's basically our fault.
panic(err)
}
req.AddMessage(topic, partition, &Message{Codec: p.conf.Producer.Compression, Key: nil, Value: valBytes})
}
}
}

if empty {
return nil
}
return req
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
Expand All @@ -857,7 +869,7 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
}
}

func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage, unused error) {
for _, msg := range batch {
if msg == nil {
continue
Expand Down

0 comments on commit 7d70e73

Please sign in to comment.