diff --git a/consumergroup.go b/consumergroup.go index 978253083..1440a5190 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -49,8 +49,8 @@ const ( defaultJoinGroupBackoff = 5 * time.Second // defaultRetentionTime holds the length of time a the consumer group will be - // saved by kafka - defaultRetentionTime = time.Hour * 24 + // saved by kafka. This value tells the broker to use its configured value. + defaultRetentionTime = -1 // defaultPartitionWatchTime contains the amount of time the kafka-go will wait to // query the brokers looking for partition changes. @@ -119,10 +119,13 @@ type ConsumerGroupConfig struct { // Default: 5s JoinGroupBackoff time.Duration - // RetentionTime optionally sets the length of time the consumer group will be saved - // by the broker + // RetentionTime optionally sets the length of time the consumer group will + // be saved by the broker. -1 will disable the setting and leave the + // retention up to the broker's offsets.retention.minutes property. By + // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= + // 2.0. // - // Default: 24h + // Default: -1 RetentionTime time.Duration // StartOffset determines from whence the consumer group should begin @@ -212,7 +215,7 @@ func (config *ConsumerGroupConfig) Validate() error { return errors.New(fmt.Sprintf("JoinGroupBackoff out of bounds: %d", config.JoinGroupBackoff)) } - if config.RetentionTime < 0 { + if config.RetentionTime < defaultRetentionTime { return errors.New(fmt.Sprintf("RetentionTime out of bounds: %d", config.RetentionTime)) } diff --git a/consumergroup_test.go b/consumergroup_test.go index d3c4ce315..712cbef6a 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -101,11 +101,11 @@ func TestValidateConsumerGroupConfig(t *testing.T) { {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", SessionTimeout: -1}, errorOccured: true}, {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: -1}, errorOccured: true}, {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: -2}, errorOccured: true}, - {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: -1}, errorOccured: true}, - {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, StartOffset: 123}, errorOccured: true}, - {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: -1}, errorOccured: true}, - {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: 1, JoinGroupBackoff: -1}, errorOccured: true}, - {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: 1, PartitionWatchInterval: 1, JoinGroupBackoff: 1}, errorOccured: false}, + {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, RetentionTime: -2}, errorOccured: true}, + {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, StartOffset: 123}, errorOccured: true}, + {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: -1}, errorOccured: true}, + {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: 1, JoinGroupBackoff: -1}, errorOccured: true}, + {config: ConsumerGroupConfig{Brokers: []string{"broker1"}, Topics: []string{"t1"}, ID: "group1", HeartbeatInterval: 2, SessionTimeout: 2, RebalanceTimeout: 2, PartitionWatchInterval: 1, JoinGroupBackoff: 1}, errorOccured: false}, } for _, test := range tests { err := test.config.Validate()