Skip to content

Commit

Permalink
Merge pull request #878 from mihaitodor/kafka-rack-awareness
Browse files Browse the repository at this point in the history
Add rack_id field for Kafka components
  • Loading branch information
Jeffail authored Sep 16, 2021
2 parents 733f957 + 6f6b122 commit 1afd2dd
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- Fields `cache_control`, `content_disposition`, `content_language` and `website_redirect_location` added to the `aws_s3` output.
- Field `cors.enabled` and `cors.allowed_origins` added to the server wide `http` config.
- For Kafka components the config now supports the `rack_id` field which may contain a rack identifier for the Kafka client.
- Allow mapping imports in Bloblang environments to be disabled.

### Fixed
Expand Down
2 changes: 2 additions & 0 deletions lib/input/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ You can access these metadata fields using [function interpolation](/docs/config
sasl.FieldSpec(),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the connection. This field can be explicitly made empty in order to disable stored offsets for the consumed topic partitions."),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldCommon(
"checkpoint_limit", "EXPERIMENTAL: The maximum number of messages of the same topic and partition that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level to work on individual partitions. Any given offset will not be committed unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.",
Expand Down Expand Up @@ -462,6 +463,7 @@ func (k *kafkaReader) ConnectWithContext(ctx context.Context) error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/input/kafka_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ You can access these metadata fields using
sasl.FieldSpec(),
docs.FieldCommon("topics", "A list of topics to consume from. If an item of the list contains commas it will be expanded into multiple topics.").Array(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the connection."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldAdvanced("commit_period", "The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown."),
Expand Down
2 changes: 2 additions & 0 deletions lib/input/reader/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type KafkaConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
Topics []string `json:"topics" yaml:"topics"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
Group KafkaBalancedGroupConfig `json:"group" yaml:"group"`
CommitPeriod string `json:"commit_period" yaml:"commit_period"`
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewKafkaConfig() KafkaConfig {
Addresses: []string{"localhost:9092"},
Topics: []string{},
ClientID: "benthos_kafka_input",
RackID: "",
ConsumerGroup: "benthos_consumer_group",
Group: NewKafkaBalancedGroupConfig(),
CommitPeriod: "1s",
Expand Down
3 changes: 3 additions & 0 deletions lib/input/reader/kafka_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewKafkaBalancedGroupConfig() KafkaBalancedGroupConfig {
type KafkaBalancedConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
Group KafkaBalancedGroupConfig `json:"group" yaml:"group"`
CommitPeriod string `json:"commit_period" yaml:"commit_period"`
Expand All @@ -65,6 +66,7 @@ func NewKafkaBalancedConfig() KafkaBalancedConfig {
return KafkaBalancedConfig{
Addresses: []string{"localhost:9092"},
ClientID: "benthos_kafka_input",
RackID: "",
ConsumerGroup: "benthos_consumer_group",
Group: NewKafkaBalancedGroupConfig(),
CommitPeriod: "1s",
Expand Down Expand Up @@ -274,6 +276,7 @@ func (k *KafkaBalanced) Connect() error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/input/reader/kafka_cg.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (k *KafkaCG) ConnectWithContext(ctx context.Context) error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ However, this also means that manual intervention will eventually be required in
sasl.FieldSpec(),
docs.FieldCommon("topic", "The topic to publish messages to.").IsInterpolated(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldCommon("key", "The key to publish messages with.").IsInterpolated(),
docs.FieldCommon("partitioner", "The partitioning algorithm to use.").HasOptions("fnv1a_hash", "murmur2_hash", "random", "round_robin", "manual"),
docs.FieldAdvanced("partition", "The manually-specified partition to publish messages to, relevant only when the field `partitioner` is set to `manual`. Must be able to parse as a 32-bit integer.").IsInterpolated(),
Expand Down
3 changes: 3 additions & 0 deletions lib/output/writer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type KafkaConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
Key string `json:"key" yaml:"key"`
Partitioner string `json:"partitioner" yaml:"partitioner"`
Partition string `json:"partition" yaml:"partition"`
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewKafkaConfig() KafkaConfig {
return KafkaConfig{
Addresses: []string{"localhost:9092"},
ClientID: "benthos_kafka_output",
RackID: "",
Key: "",
RoundRobinPartitions: false,
Partitioner: "fnv1a_hash",
Expand Down Expand Up @@ -292,6 +294,7 @@ func (k *Kafka) Connect() error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID

config.Version = k.version

Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ input:
token_key: ""
consumer_group: benthos_consumer_group
client_id: benthos_kafka_input
rack_id: ""
start_from_oldest: true
checkpoint_limit: 1
commit_period: 1s
Expand Down Expand Up @@ -373,6 +374,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_input"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `start_from_oldest`

If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset.
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/kafka_balanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ input:
topics:
- benthos_stream
client_id: benthos_kafka_input
rack_id: ""
consumer_group: benthos_consumer_group
start_from_oldest: true
commit_period: 1s
Expand Down Expand Up @@ -363,6 +364,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_input"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `consumer_group`

An identifier for the consumer group of the connection.
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ output:
token_key: ""
topic: benthos_stream
client_id: benthos_kafka_output
rack_id: ""
key: ""
partitioner: fnv1a_hash
partition: ""
Expand Down Expand Up @@ -360,6 +361,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_output"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `key`

The key to publish messages with.
Expand Down

0 comments on commit 1afd2dd

Please sign in to comment.