Skip to content

Commit

Permalink
[fix] Send Close Command on Producer/Consumer create timeout (#1061)
Browse files Browse the repository at this point in the history
### Motivation

This change is the same as apache/pulsar#13161 and apache/pulsar#16616, and is justified by these lines of our binary protocol spec:

* https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L301-L304
* https://github.com/apache/pulsar-site/blob/9b4b3d39014bd47c0bb9f66742b89bcb40ed7f07/docs/developing-binary-protocol.md?plain=1#L468-L471

### Modifications

* When a producer or a consumer times out during creation, make an attempt to close the producer or consumer by sending the appropriate close command. Failures can safely be ignored because the only time that the close will actually matter is when the TCP connection is open for other protocol messages. The one nuance is that we send the close command to the same address pair that we send the create command.

(cherry picked from commit d4e08c6)
  • Loading branch information
michaeljmarshall authored and RobertIndie committed Sep 7, 2023
1 parent 5dc2a49 commit 1724dc9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,15 @@ func (pc *partitionConsumer) grabConn() error {

if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
if err == internal.ErrRequestTimeOut {
requestID := pc.client.rpcClient.NewRequestID()
cmdClose := &pb.CommandCloseConsumer{
ConsumerId: proto.Uint64(pc.consumerID),
RequestId: proto.Uint64(requestID),
}
_, _ = pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
}
return err
}

Expand Down
8 changes: 8 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ func (p *partitionProducer) grabCnx() error {
res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
if err != nil {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
if err == internal.ErrRequestTimeOut {
id := p.client.rpcClient.NewRequestID()
_, _ = p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_CLOSE_PRODUCER,
&pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
}
return err
}

Expand Down

0 comments on commit 1724dc9

Please sign in to comment.