Skip to content

Commit

Permalink
[issue #807] dlq topic producer options (#809)
Browse files Browse the repository at this point in the history
* dlq topic producer options

* user defined producer options

* add another unit test

* retry letter queue producer use the same dlq producer option

Fixes #807 

### Motivation

To customize producer options for DLQ topics.

### Modifications

Add DLQProducerOptions to consumer's DLQ policy.

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

The existing DLQ test is enhanced to cover customized producer options.

### Does this pull request potentially affect one of the following parts:
A new producer options field, `DLQProuducerOptions`, is introduced for DLQ policy to govern the producer options.
```
ConsumerOptions{
		Topics:              topics,
		DLQ: &DLQPolicy{
			MaxDeliveries:   3,
			DeadLetterTopic: dlqTopic,
			DLQProducerOptions: ProducerOptions{
				BatchingMaxPublishDelay: 100 * time.Millisecond,
			},
		},
```
  - Dependencies (does it add or upgrade a dependency): no
  - The public API: yes
  - The schema: no
  - The default values of configurations: yes (the existing lz4 compression type is the default.)
  - The wire protocol: no
 
### Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (GoDocs)
  - If a feature is not applicable for documentation, explain why?
  - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • Loading branch information
zzzming authored Sep 3, 2022
1 parent f8dc88e commit 68e4317
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 15 deletions.
3 changes: 3 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type DLQPolicy struct {
// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
DeadLetterTopic string

// ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
ProducerOptions ProducerOptions

// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
RetryLetterTopic string
}
Expand Down
28 changes: 24 additions & 4 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,19 @@ func TestConsumerReceiveErrAfterClose(t *testing.T) {
}

func TestDLQ(t *testing.T) {
DLQWithProducerOptions(t, nil)
}

func TestDLQWithProducerOptions(t *testing.T) {
DLQWithProducerOptions(t,
&ProducerOptions{
BatchingMaxPublishDelay: 100 * time.Millisecond,
BatchingMaxSize: 64 * 1024,
CompressionType: ZLib,
})
}

func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
Expand All @@ -1045,15 +1058,19 @@ func TestDLQ(t *testing.T) {
ctx := context.Background()

// create consumer
dlqPolicy := DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
}
if prodOpt != nil {
dlqPolicy.ProducerOptions = *prodOpt
}
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
},
DLQ: &dlqPolicy,
})
assert.Nil(t, err)
defer consumer.Close()
Expand Down Expand Up @@ -1156,6 +1173,9 @@ func TestDLQMultiTopics(t *testing.T) {
DLQ: &DLQPolicy{
MaxDeliveries: 3,
DeadLetterTopic: dlqTopic,
ProducerOptions: ProducerOptions{
BatchingMaxPublishDelay: 100 * time.Millisecond,
},
},
})
assert.Nil(t, err)
Expand Down
16 changes: 10 additions & 6 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,16 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
// Retry to create producer indefinitely
backoff := &internal.Backoff{}
for {
producer, err := r.client.CreateProducer(ProducerOptions{
Topic: r.policy.DeadLetterTopic,
CompressionType: LZ4,
BatchingMaxPublishDelay: 100 * time.Millisecond,
Schema: schema,
})
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema

// the origin code sets to LZ4 compression with no options
// so the new design allows compression type to be overwritten but still set lz4 by default
if r.policy.ProducerOptions.CompressionType == NoCompression {
opt.CompressionType = LZ4
}
producer, err := r.client.CreateProducer(opt)

if err != nil {
r.log.WithError(err).Error("Failed to create DLQ producer")
Expand Down
14 changes: 9 additions & 5 deletions pulsar/retry_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,15 @@ func (r *retryRouter) getProducer() Producer {
// Retry to create producer indefinitely
backoff := &internal.Backoff{}
for {
producer, err := r.client.CreateProducer(ProducerOptions{
Topic: r.policy.RetryLetterTopic,
CompressionType: LZ4,
BatchingMaxPublishDelay: 100 * time.Millisecond,
})
opt := r.policy.ProducerOptions
opt.Topic = r.policy.RetryLetterTopic
// the origin code sets to LZ4 compression with no options
// so the new design allows compression type to be overwritten but still set lz4 by default
if r.policy.ProducerOptions.CompressionType == NoCompression {
opt.CompressionType = LZ4
}

producer, err := r.client.CreateProducer(opt)

if err != nil {
r.log.WithError(err).Error("Failed to create RLQ producer")
Expand Down

0 comments on commit 68e4317

Please sign in to comment.