From 66f33cbcbed38df0fa95c5f17176712526f7f79a Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Mon, 29 Jan 2024 13:43:02 -0500 Subject: [PATCH] changefeedccl: allow per changefeed kafka quota config Previously, users were limited to setting a single kafka quota configuration for cockroachdb which was then applied and restricting all changefeeds. This patch introduces a new changefeed configuration option, allowing users to define client id for different changefeeds, allowing users to specify different kafka quota configurations for different changefeeds. To use it, users can specify a unique client ID using `kafka_sink_config` and configure different quota settings on kafka server based on https://kafka.apache.org/documentation/#quotas. ``` CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}' ``` Note that Fixes: https://github.com/cockroachdb/cockroach/issues/92290 Release note: `kafka_sink_config` now supports specifying a different client ID for different changefeeds, enabling users to define distinct kafka quota configurations for various changefeeds. For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently between clients and brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)), any string can be used as client ID. For earlier kafka versions, clientID can only contain characters [A-Za-z0-9._-] are acceptable. For example, ``` CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}' ``` --- pkg/ccl/changefeedccl/sink_kafka.go | 6 ++++-- pkg/ccl/changefeedccl/sink_test.go | 13 +++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index a6e0e499f1b2..ea4804aabc3d 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -183,6 +183,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error { } type saramaConfig struct { + ClientID string `json:",omitempty"` // These settings mirror ones in sarama config. // We just tag them w/ JSON annotations. // Flush describes settings specific to producer flushing. @@ -235,7 +236,7 @@ func defaultSaramaConfig() *saramaConfig { // this workaround is the one that's been running in roachtests and I'd want // to test this one more before changing it. config.Flush.MaxMessages = 1000 - + config.ClientID = "CockroachDB" return config } @@ -822,6 +823,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error { kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency) kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages + kafka.ClientID = c.ClientID + if c.Version != "" { parsedVersion, err := sarama.ParseKafkaVersion(c.Version) if err != nil { @@ -1077,7 +1080,6 @@ func buildKafkaConfig( return nil, err } config := sarama.NewConfig() - config.ClientID = `CockroachDB` config.Producer.Return.Successes = true config.Producer.Partitioner = newChangefeedPartitioner // Do not fetch metadata for all topics but just for the necessary ones. diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 1963ae4ab157..7a6c68080be6 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -633,7 +633,20 @@ func TestSaramaConfigOptionParsing(t *testing.T) { saramaCfg, _ = getSaramaConfig(opts) err = saramaCfg.Apply(config) require.NoError(t, err) + require.NoError(t, cfg.Validate()) + + opts = `{"ClientID": "clientID1"}` + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) + require.NoError(t, err) + require.NoError(t, config.Validate()) + + opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}` + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) + require.NoError(t, err) require.NoError(t, config.Validate()) + require.True(t, config.ClientID == "clientID1") }) t.Run("validate returns error for bad flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)