Skip to content

Commit

Permalink
changefeedccl: allow per changefeed kafka quota config
Browse files Browse the repository at this point in the history
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Note that Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.

For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently
between clients and
brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.

For example,
```
CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}'
```
  • Loading branch information
wenyihu6 committed Feb 7, 2024
1 parent 58de93e commit 66f33cb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
}

type saramaConfig struct {
ClientID string `json:",omitempty"`
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
Expand Down Expand Up @@ -235,7 +236,7 @@ func defaultSaramaConfig() *saramaConfig {
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000

config.ClientID = "CockroachDB"
return config
}

Expand Down Expand Up @@ -822,6 +823,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error {
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
kafka.ClientID = c.ClientID

if c.Version != "" {
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
Expand Down Expand Up @@ -1077,7 +1080,6 @@ func buildKafkaConfig(
return nil, err
}
config := sarama.NewConfig()
config.ClientID = `CockroachDB`
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
Expand Down
13 changes: 13 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,20 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
saramaCfg, _ = getSaramaConfig(opts)
err = saramaCfg.Apply(config)
require.NoError(t, err)
require.NoError(t, cfg.Validate())

opts = `{"ClientID": "clientID1"}`
saramaCfg, _ = getSaramaConfig(opts)
err = saramaCfg.Apply(config)
require.NoError(t, err)
require.NoError(t, config.Validate())

opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}`
saramaCfg, _ = getSaramaConfig(opts)
err = saramaCfg.Apply(config)
require.NoError(t, err)
require.NoError(t, config.Validate())
require.True(t, config.ClientID == "clientID1")
})
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)
Expand Down

0 comments on commit 66f33cb

Please sign in to comment.