Skip to content

Commit

Permalink
changefeedccl: better validation check for sarama config
Browse files Browse the repository at this point in the history
Previously, we implement our own validation check for sarama config which may
have missed certain cases. This patch changes this by utilizing sarama's own
exported validation function.

Release note: none
Epic: none
  • Loading branch information
wenyihu6 committed Feb 7, 2024
1 parent 3ff9177 commit 58de93e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 26 deletions.
20 changes: 4 additions & 16 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,6 @@ type saramaConfig struct {
Version string `json:",omitempty"`
}

func (c saramaConfig) Validate() error {
// If Flush.Bytes > 0 or Flush.Messages > 1 without
// Flush.Frequency, sarama may wait forever to flush the
// messages to Kafka. We want to guard against such
// configurations to ensure that we don't get into a situation
// where our call to Flush() would block forever.
if (c.Flush.Bytes > 0 || c.Flush.Messages > 1) && c.Flush.Frequency == 0 {
return errors.New("Flush.Frequency must be > 0 when Flush.Bytes > 0 or Flush.Messages > 1")
}
return nil
}

func defaultSaramaConfig() *saramaConfig {
config := &saramaConfig{}

Expand Down Expand Up @@ -1157,13 +1145,13 @@ func buildKafkaConfig(
"failed to parse sarama config; check %s option", changefeedbase.OptKafkaSinkConfig)
}

if err := saramaCfg.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid sarama configuration")
}

if err := saramaCfg.Apply(config); err != nil {
return nil, errors.Wrap(err, "failed to apply kafka client configuration")
}

if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid sarama configuration")
}
return config, nil
}

Expand Down
24 changes: 14 additions & 10 deletions pkg/ccl/changefeedccl/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,27 +623,31 @@ func TestSaramaConfigOptionParsing(t *testing.T) {
})
t.Run("validate returns nil for valid flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000, "Frequency": "1s"}}`)

cfg, err := getSaramaConfig(opts)
config := sarama.NewConfig()
saramaCfg, _ := getSaramaConfig(opts)
err := saramaCfg.Apply(config)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, config.Validate())

opts = `{"Flush": {"Messages": 1}}`
cfg, err = getSaramaConfig(opts)
saramaCfg, _ = getSaramaConfig(opts)
err = saramaCfg.Apply(config)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
require.NoError(t, config.Validate())
})
t.Run("validate returns error for bad flush configuration", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"Flush": {"Messages": 1000}}`)

cfg, err := getSaramaConfig(opts)
config := sarama.NewConfig()
saramaCfg, _ := getSaramaConfig(opts)
err := saramaCfg.Apply(config)
require.NoError(t, err)
require.Error(t, cfg.Validate())
require.Error(t, config.Validate())

opts = `{"Flush": {"Bytes": 10}}`
cfg, err = getSaramaConfig(opts)
saramaCfg, _ = getSaramaConfig(opts)
err = saramaCfg.Apply(config)
require.NoError(t, err)
require.Error(t, cfg.Validate())
require.Error(t, config.Validate())
})
t.Run("apply parses valid version", func(t *testing.T) {
opts := changefeedbase.SinkSpecificJSONConfig(`{"version": "0.8.2.0"}`)
Expand Down

0 comments on commit 58de93e

Please sign in to comment.