Skip to content

Commit

Permalink
Merge pull request #1686 from xujianhai666/shopify-master
Browse files Browse the repository at this point in the history
feat: allow AsyncProducer to have MaxOpenRequests inflight produce requests per broker
  • Loading branch information
dnwe authored Jan 12, 2022
2 parents a059adb + 8104f4d commit a060eca
Showing 1 changed file with 48 additions and 4 deletions.
52 changes: 48 additions & 4 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,12 +670,20 @@ func (pp *partitionProducer) updateLeader() error {
})
}

type pendingResponse struct {
set *produceSet
version int16
promise *responsePromise
response *ProduceResponse
}

// one per broker; also constructs an associated flusher
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
var (
input = make(chan *ProducerMessage)
bridge = make(chan *produceSet)
responses = make(chan *brokerProducerResponse)
pendings = make(chan *pendingResponse, p.conf.Net.MaxOpenRequests-1)
)

bp := &brokerProducer{
Expand All @@ -692,18 +700,54 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {

// minimal bridge to make the network response `select`able
go withRecover(func() {
defer close(pendings)
for set := range bridge {
set := set
var err error
var response *ProduceResponse
var promise *responsePromise

request := set.buildRequest()
if request.RequiredAcks != NoResponse {
response = new(ProduceResponse)
}

response, err := broker.Produce(request)
responseHeaderVersion := int16(-1)
if response != nil {
responseHeaderVersion = response.headerVersion()
}

promise, err = broker.send(request, response != nil, responseHeaderVersion)

// return quickly if failed or ackMode: NoResponse
if err != nil || promise == nil {
responses <- &brokerProducerResponse{
set: set,
err: err,
res: response,
}
continue
}
pending := &pendingResponse{set: set, version: request.version(), response: response, promise: promise}
pendings <- pending
}
})

go withRecover(func() {
defer close(responses)
for pending := range pendings {
var err error
select {
case buf := <-pending.promise.packets:
err = versionedDecode(buf, pending.response, pending.version)
case err = <-pending.promise.errors:
}
responses <- &brokerProducerResponse{
set: set,
set: pending.set,
err: err,
res: response,
res: pending.response,
}
}
close(responses)
})

if p.conf.Producer.Retry.Max <= 0 {
Expand Down

0 comments on commit a060eca

Please sign in to comment.