Skip to content

Commit

Permalink
Merge pull request #2880 from Scarjit/main
Browse files Browse the repository at this point in the history
kafka_franz: Add option to change metadata.max.age
  • Loading branch information
rockwotj authored Sep 19, 2024
2 parents 6f0a557 + e7a85d5 commit a44acad
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- New experimental `aws_bedrock_embeddings` processor. (@rockwotj)
- New experimental `cohere_chat` and `cohere_embeddings` processors. (@rockwotj)
- New experimental `questdb` output. (@sklarsa)
- Field `metadata_max_age` added to the `kafka_franz` input. (@Scarjit)

## 4.36.0 - 2024-09-11

Expand Down
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/kafka_franz.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ input:
period: ""
check: ""
processors: [] # No default (optional)
metadata_max_age: 5m
```
--
Expand Down Expand Up @@ -694,4 +695,13 @@ processors:
format: json_array
```
=== `metadata_max_age`
The maximum age of metadata before it is refreshed.
*Type*: `string`
*Default*: `"5m"`
10 changes: 10 additions & 0 deletions docs/modules/components/pages/inputs/ockam_kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ input:
period: ""
check: ""
processors: [] # No default (optional)
metadata_max_age: 5m
seed_brokers: [] # No default (optional)
disable_content_encryption: false
enrollment_ticket: "" # No default (optional)
Expand Down Expand Up @@ -701,6 +702,15 @@ processors:
format: json_array
```
=== `kafka.metadata_max_age`
The maximum age of metadata before it is refreshed.
*Type*: `string`
*Default*: `"5m"`
=== `kafka.seed_brokers`
A list of broker addresses to connect to in order to establish connections. If an item of the list contains commas it will be expanded into multiple addresses.
Expand Down
9 changes: 9 additions & 0 deletions internal/impl/kafka/input_kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ Finally, it's also possible to specify an explicit offset to consume from by add
service.NewBatchPolicyField("batching").
Description("Allows you to configure a xref:configuration:batching.adoc[batching policy] that applies to individual topic partitions in order to batch messages together before flushing them for processing. Batching can be beneficial for performance as well as useful for windowed processing, and doing so this way preserves the ordering of topic partitions.").
Advanced(),
service.NewDurationField("metadata_max_age").
Description("The maximum age of metadata before it is refreshed.").
Default("5m").
Advanced(),
}
}

Expand Down Expand Up @@ -166,6 +170,7 @@ type FranzKafkaReader struct {
regexPattern bool
multiHeader bool
batchPolicy service.BatchPolicy
metadataMaxAge time.Duration

batchChan atomic.Value
res *service.Resources
Expand Down Expand Up @@ -269,6 +274,9 @@ func NewFranzKafkaReaderFromConfig(conf *service.ParsedConfig, res *service.Reso
if f.saslConfs, err = SASLMechanismsFromConfig(conf); err != nil {
return nil, err
}
if f.metadataMaxAge, err = conf.FieldDuration("metadata_max_age"); err != nil {
return nil, err
}

return &f, nil
}
Expand Down Expand Up @@ -642,6 +650,7 @@ func (f *FranzKafkaReader) Connect(ctx context.Context) error {
kgo.ConsumerGroup(f.consumerGroup),
kgo.ClientID(f.clientID),
kgo.Rack(f.rackID),
kgo.MetadataMaxAge(f.metadataMaxAge),
}

if f.consumerGroup != "" {
Expand Down

0 comments on commit a44acad

Please sign in to comment.