From 58de93edc617de418024a35f15c2be2f191c1d99 Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Wed, 7 Feb 2024 10:54:40 -0500 Subject: [PATCH] changefeedccl: better validation check for sarama config Previously, we implement our own validation check for sarama config which may have missed certain cases. This patch changes this by utilizing sarama's own exported validation function. Release note: none Epic: none --- pkg/ccl/changefeedccl/sink_kafka.go | 20 ++++---------------- pkg/ccl/changefeedccl/sink_test.go | 24 ++++++++++++++---------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index ce1eadb14a55..a6e0e499f1b2 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -201,18 +201,6 @@ type saramaConfig struct { Version string `json:",omitempty"` } -func (c saramaConfig) Validate() error { - // If Flush.Bytes > 0 or Flush.Messages > 1 without - // Flush.Frequency, sarama may wait forever to flush the - // messages to Kafka. We want to guard against such - // configurations to ensure that we don't get into a situation - // where our call to Flush() would block forever. - if (c.Flush.Bytes > 0 || c.Flush.Messages > 1) && c.Flush.Frequency == 0 { - return errors.New("Flush.Frequency must be > 0 when Flush.Bytes > 0 or Flush.Messages > 1") - } - return nil -} - func defaultSaramaConfig() *saramaConfig { config := &saramaConfig{} @@ -1157,13 +1145,13 @@ func buildKafkaConfig( "failed to parse sarama config; check %s option", changefeedbase.OptKafkaSinkConfig) } - if err := saramaCfg.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid sarama configuration") - } - if err := saramaCfg.Apply(config); err != nil { return nil, errors.Wrap(err, "failed to apply kafka client configuration") } + + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid sarama configuration") + } return config, nil } diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 076742a43d9c..1963ae4ab157 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -623,27 +623,31 @@ func TestSaramaConfigOptionParsing(t *testing.T) { }) t.Run("validate returns nil for valid flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000, "Frequency": "1s"}}`) - - cfg, err := getSaramaConfig(opts) + config := sarama.NewConfig() + saramaCfg, _ := getSaramaConfig(opts) + err := saramaCfg.Apply(config) require.NoError(t, err) - require.NoError(t, cfg.Validate()) + require.NoError(t, config.Validate()) opts = `{"Flush": {"Messages": 1}}` - cfg, err = getSaramaConfig(opts) + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) require.NoError(t, err) - require.NoError(t, cfg.Validate()) + require.NoError(t, config.Validate()) }) t.Run("validate returns error for bad flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`) - - cfg, err := getSaramaConfig(opts) + config := sarama.NewConfig() + saramaCfg, _ := getSaramaConfig(opts) + err := saramaCfg.Apply(config) require.NoError(t, err) - require.Error(t, cfg.Validate()) + require.Error(t, config.Validate()) opts = `{"Flush": {"Bytes": 10}}` - cfg, err = getSaramaConfig(opts) + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) require.NoError(t, err) - require.Error(t, cfg.Validate()) + require.Error(t, config.Validate()) }) t.Run("apply parses valid version", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)