Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rack_id field for Kafka components #878

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

- Fields `cache_control`, `content_disposition`, `content_language` and `website_redirect_location` added to the `aws_s3` output.
- Field `cors.enabled` and `cors.allowed_origins` added to the server wide `http` config.
- For Kafka components the config now supports the `rack_id` field which may contain a rack identifier for the Kafka client.
- Allow mapping imports in Bloblang environments to be disabled.

### Fixed
Expand Down
2 changes: 2 additions & 0 deletions lib/input/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ You can access these metadata fields using [function interpolation](/docs/config
sasl.FieldSpec(),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the connection. This field can be explicitly made empty in order to disable stored offsets for the consumed topic partitions."),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldCommon(
"checkpoint_limit", "EXPERIMENTAL: The maximum number of messages of the same topic and partition that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level to work on individual partitions. Any given offset will not be committed unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.",
Expand Down Expand Up @@ -462,6 +463,7 @@ func (k *kafkaReader) ConnectWithContext(ctx context.Context) error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/input/kafka_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ You can access these metadata fields using
sasl.FieldSpec(),
docs.FieldCommon("topics", "A list of topics to consume from. If an item of the list contains commas it will be expanded into multiple topics.").Array(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldCommon("consumer_group", "An identifier for the consumer group of the connection."),
docs.FieldAdvanced("start_from_oldest", "If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset."),
docs.FieldAdvanced("commit_period", "The period of time between each commit of the current partition offsets. Offsets are always committed during shutdown."),
Expand Down
2 changes: 2 additions & 0 deletions lib/input/reader/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type KafkaConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
Topics []string `json:"topics" yaml:"topics"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
Group KafkaBalancedGroupConfig `json:"group" yaml:"group"`
CommitPeriod string `json:"commit_period" yaml:"commit_period"`
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewKafkaConfig() KafkaConfig {
Addresses: []string{"localhost:9092"},
Topics: []string{},
ClientID: "benthos_kafka_input",
RackID: "",
ConsumerGroup: "benthos_consumer_group",
Group: NewKafkaBalancedGroupConfig(),
CommitPeriod: "1s",
Expand Down
3 changes: 3 additions & 0 deletions lib/input/reader/kafka_balanced.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewKafkaBalancedGroupConfig() KafkaBalancedGroupConfig {
type KafkaBalancedConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
Group KafkaBalancedGroupConfig `json:"group" yaml:"group"`
CommitPeriod string `json:"commit_period" yaml:"commit_period"`
Expand All @@ -65,6 +66,7 @@ func NewKafkaBalancedConfig() KafkaBalancedConfig {
return KafkaBalancedConfig{
Addresses: []string{"localhost:9092"},
ClientID: "benthos_kafka_input",
RackID: "",
ConsumerGroup: "benthos_consumer_group",
Group: NewKafkaBalancedGroupConfig(),
CommitPeriod: "1s",
Expand Down Expand Up @@ -274,6 +276,7 @@ func (k *KafkaBalanced) Connect() error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/input/reader/kafka_cg.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func (k *KafkaCG) ConnectWithContext(ctx context.Context) error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID
config.Net.DialTimeout = time.Second
config.Version = k.version
config.Consumer.Return.Errors = true
Expand Down
1 change: 1 addition & 0 deletions lib/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ However, this also means that manual intervention will eventually be required in
sasl.FieldSpec(),
docs.FieldCommon("topic", "The topic to publish messages to.").IsInterpolated(),
docs.FieldCommon("client_id", "An identifier for the client connection."),
docs.FieldAdvanced("rack_id", "A rack identifier for this client."),
docs.FieldCommon("key", "The key to publish messages with.").IsInterpolated(),
docs.FieldCommon("partitioner", "The partitioning algorithm to use.").HasOptions("fnv1a_hash", "murmur2_hash", "random", "round_robin", "manual"),
docs.FieldAdvanced("partition", "The manually-specified partition to publish messages to, relevant only when the field `partitioner` is set to `manual`. Must be able to parse as a 32-bit integer.").IsInterpolated(),
Expand Down
3 changes: 3 additions & 0 deletions lib/output/writer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type KafkaConfig struct {
Addresses []string `json:"addresses" yaml:"addresses"`
ClientID string `json:"client_id" yaml:"client_id"`
RackID string `json:"rack_id" yaml:"rack_id"`
Key string `json:"key" yaml:"key"`
Partitioner string `json:"partitioner" yaml:"partitioner"`
Partition string `json:"partition" yaml:"partition"`
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewKafkaConfig() KafkaConfig {
return KafkaConfig{
Addresses: []string{"localhost:9092"},
ClientID: "benthos_kafka_output",
RackID: "",
Key: "",
RoundRobinPartitions: false,
Partitioner: "fnv1a_hash",
Expand Down Expand Up @@ -292,6 +294,7 @@ func (k *Kafka) Connect() error {

config := sarama.NewConfig()
config.ClientID = k.conf.ClientID
config.RackID = k.conf.RackID

config.Version = k.version

Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ input:
token_key: ""
consumer_group: benthos_consumer_group
client_id: benthos_kafka_input
rack_id: ""
start_from_oldest: true
checkpoint_limit: 1
commit_period: 1s
Expand Down Expand Up @@ -373,6 +374,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_input"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `start_from_oldest`

If an offset is not found for a topic partition, determines whether to consume from the oldest available offset, otherwise messages are consumed from the latest offset.
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/inputs/kafka_balanced.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ input:
topics:
- benthos_stream
client_id: benthos_kafka_input
rack_id: ""
consumer_group: benthos_consumer_group
start_from_oldest: true
commit_period: 1s
Expand Down Expand Up @@ -363,6 +364,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_input"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `consumer_group`

An identifier for the consumer group of the connection.
Expand Down
9 changes: 9 additions & 0 deletions website/docs/components/outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ output:
token_key: ""
topic: benthos_stream
client_id: benthos_kafka_output
rack_id: ""
key: ""
partitioner: fnv1a_hash
partition: ""
Expand Down Expand Up @@ -360,6 +361,14 @@ An identifier for the client connection.
Type: `string`
Default: `"benthos_kafka_output"`

### `rack_id`

A rack identifier for this client.


Type: `string`
Default: `""`

### `key`

The key to publish messages with.
Expand Down