diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index ce1eadb14a55..a6e0e499f1b2 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -201,18 +201,6 @@ type saramaConfig struct { Version string `json:",omitempty"` } -func (c saramaConfig) Validate() error { - // If Flush.Bytes > 0 or Flush.Messages > 1 without - // Flush.Frequency, sarama may wait forever to flush the - // messages to Kafka. We want to guard against such - // configurations to ensure that we don't get into a situation - // where our call to Flush() would block forever. - if (c.Flush.Bytes > 0 || c.Flush.Messages > 1) && c.Flush.Frequency == 0 { - return errors.New("Flush.Frequency must be > 0 when Flush.Bytes > 0 or Flush.Messages > 1") - } - return nil -} - func defaultSaramaConfig() *saramaConfig { config := &saramaConfig{} @@ -1157,13 +1145,13 @@ func buildKafkaConfig( "failed to parse sarama config; check %s option", changefeedbase.OptKafkaSinkConfig) } - if err := saramaCfg.Validate(); err != nil { - return nil, errors.Wrap(err, "invalid sarama configuration") - } - if err := saramaCfg.Apply(config); err != nil { return nil, errors.Wrap(err, "failed to apply kafka client configuration") } + + if err := config.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid sarama configuration") + } return config, nil } diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 076742a43d9c..1963ae4ab157 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -623,27 +623,31 @@ func TestSaramaConfigOptionParsing(t *testing.T) { }) t.Run("validate returns nil for valid flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000, "Frequency": "1s"}}`) - - cfg, err := getSaramaConfig(opts) + config := sarama.NewConfig() + saramaCfg, _ := getSaramaConfig(opts) + err := saramaCfg.Apply(config) require.NoError(t, err) - require.NoError(t, cfg.Validate()) + require.NoError(t, config.Validate()) opts = `{"Flush": {"Messages": 1}}` - cfg, err = getSaramaConfig(opts) + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) require.NoError(t, err) - require.NoError(t, cfg.Validate()) + require.NoError(t, config.Validate()) }) t.Run("validate returns error for bad flush configuration", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`) - - cfg, err := getSaramaConfig(opts) + config := sarama.NewConfig() + saramaCfg, _ := getSaramaConfig(opts) + err := saramaCfg.Apply(config) require.NoError(t, err) - require.Error(t, cfg.Validate()) + require.Error(t, config.Validate()) opts = `{"Flush": {"Bytes": 10}}` - cfg, err = getSaramaConfig(opts) + saramaCfg, _ = getSaramaConfig(opts) + err = saramaCfg.Apply(config) require.NoError(t, err) - require.Error(t, cfg.Validate()) + require.Error(t, config.Validate()) }) t.Run("apply parses valid version", func(t *testing.T) { opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)