Skip to content

Commit

Permalink
changefeedccl: allow per changefeed kafka quota config
Browse files Browse the repository at this point in the history
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Note that Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.

For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently
between clients and
brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.

For example,
```
CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}'
```
  • Loading branch information
wenyihu6 committed Feb 21, 2024
1 parent 036e083 commit 1471273
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error {
// from sarama.Config. This facilitates users with limited sarama
// configurations.
type saramaConfig struct {
ClientID string `json:",omitempty"`
// These settings mirror ones in sarama config.
// We just tag them w/ JSON annotations.
// Flush describes settings specific to producer flushing.
Expand Down Expand Up @@ -250,7 +251,7 @@ func defaultSaramaConfig() *saramaConfig {
// this workaround is the one that's been running in roachtests and I'd want
// to test this one more before changing it.
config.Flush.MaxMessages = 1000

config.ClientID = "CockroachDB"
return config
}

Expand Down Expand Up @@ -837,6 +838,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error {
kafka.Producer.Flush.Messages = c.Flush.Messages
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)
kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages
kafka.ClientID = c.ClientID

if c.Version != "" {
parsedVersion, err := sarama.ParseKafkaVersion(c.Version)
if err != nil {
Expand Down Expand Up @@ -1092,7 +1095,6 @@ func buildKafkaConfig(
return nil, err
}
config := sarama.NewConfig()
config.ClientID = `CockroachDB`
config.Producer.Return.Successes = true
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
Expand Down
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,22 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.NoError(t, cfg.Validate())

saramaCfg := sarama.NewConfig()
opts = `{"ClientID": "clientID1"}`
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, saramaCfg.Validate())

opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}`
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, saramaCfg.Validate())
require.True(t, cfg.ClientID == "clientID1")
})
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)
Expand All @@ -644,6 +660,14 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
cfg, err = getSaramaConfig(opts)
require.NoError(t, err)
require.Error(t, cfg.Validate())

opts = `{"Version": "0.8.2.0", "ClientID": "bad_client_id*"}`
saramaCfg := sarama.NewConfig()
cfg, _ = getSaramaConfig(opts)
err = cfg.Apply(saramaCfg)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.Error(t, saramaCfg.Validate())
})
t.Run("apply parses valid version", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)
Expand Down

0 comments on commit 1471273

Please sign in to comment.