Skip to content

Commit

Permalink
changefeedccl: allow per changefeed kafka quota config
Browse files Browse the repository at this point in the history
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.
  • Loading branch information
wenyihu6 committed Feb 2, 2024
1 parent dee857b commit 8dd6b33
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
}

type saramaConfig struct {
ClientID string `json:",omitempty"`
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
Expand Down Expand Up @@ -247,7 +248,7 @@ func defaultSaramaConfig() *saramaConfig {
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000

config.ClientID = "CockroachDB"
return config
}

Expand Down Expand Up @@ -834,6 +835,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error {
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
kafka.ClientID = c.ClientID

if c.Version != "" {
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
Expand Down Expand Up @@ -1089,7 +1092,6 @@ func buildKafkaConfig(
return nil, err
}
config := sarama.NewConfig()
config.ClientID = `CockroachDB`
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,16 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.NoError(t, cfg.Validate())

opts = `{"ClientID": "clientID1"}`
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.NoError(t, cfg.Validate())

opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}`
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
})
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)
Expand Down

0 comments on commit 8dd6b33

Please sign in to comment.