diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index e2b7e4b84581..5ea9cda71e3d 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -186,6 +186,7 @@ func (j *compressionCodec) UnmarshalText(b []byte) error { // from sarama.Config. This facilitates users with limited sarama // configurations. type saramaConfig struct { + ClientID string `json:",omitempty"` // These settings mirror ones in sarama config. // We just tag them w/ JSON annotations. // Flush describes settings specific to producer flushing. @@ -250,7 +251,7 @@ func defaultSaramaConfig() *saramaConfig { // this workaround is the one that's been running in roachtests and I'd want // to test this one more before changing it. config.Flush.MaxMessages = 1000 - + config.ClientID = "CockroachDB" return config } @@ -837,6 +838,8 @@ func (c *saramaConfig) Apply(kafka *sarama.Config) error { kafka.Producer.Flush.Messages = c.Flush.Messages kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency) kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages + kafka.ClientID = c.ClientID + if c.Version != "" { parsedVersion, err := sarama.ParseKafkaVersion(c.Version) if err != nil { @@ -1092,7 +1095,6 @@ func buildKafkaConfig( return nil, err } config := sarama.NewConfig() - config.ClientID = `CockroachDB` config.Producer.Return.Successes = true config.Producer.Partitioner = newChangefeedPartitioner // Do not fetch metadata for all topics but just for the necessary ones. diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 40d44d5ccd85..ba267982a497 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -632,6 +632,22 @@ func TestSaramaConfigOptionParsing(t *testing.T) { cfg, err = getSaramaConfig(opts) require.NoError(t, err) require.NoError(t, cfg.Validate()) + + saramaCfg := sarama.NewConfig() + opts = `{"ClientID": "clientID1"}` + cfg, _ = getSaramaConfig(opts) + err = cfg.Apply(saramaCfg) + require.NoError(t, err) + require.NoError(t, cfg.Validate()) + require.NoError(t, saramaCfg.Validate()) + + opts = `{"Flush": {"Messages": 1000, "Frequency": "1s"}, "ClientID": "clientID1"}` + cfg, _ = getSaramaConfig(opts) + err = cfg.Apply(saramaCfg) + require.NoError(t, err) + require.NoError(t, cfg.Validate()) + require.NoError(t, saramaCfg.Validate()) + require.True(t, cfg.ClientID == "clientID1") }) t.Run("validate returns error for bad flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`) @@ -644,6 +660,14 @@ func TestSaramaConfigOptionParsing(t *testing.T) { cfg, err = getSaramaConfig(opts) require.NoError(t, err) require.Error(t, cfg.Validate()) + + opts = `{"Version": "0.8.2.0", "ClientID": "bad_client_id*"}` + saramaCfg := sarama.NewConfig() + cfg, _ = getSaramaConfig(opts) + err = cfg.Apply(saramaCfg) + require.NoError(t, err) + require.NoError(t, cfg.Validate()) + require.Error(t, saramaCfg.Validate()) }) t.Run("apply parses valid version", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)