diff --git a/offset_manager.go b/offset_manager.go index 332679fd7..bc5beaf50 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -318,9 +318,13 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest { // request controlled retention was only supported from V2-V4 (it became // broker-only after that) so if the user has set the config options then - // flow those through as retention time on the commit request - if r.Version >= 2 && r.Version < 5 && om.conf.Consumer.Offsets.Retention > 0 { - r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond) + // flow those through as retention time on the commit request. + if r.Version >= 2 && r.Version < 5 { + // Map Sarama's default of 0 to Kafka's default of -1 + r.RetentionTime = -1 + if om.conf.Consumer.Offsets.Retention > 0 { + r.RetentionTime = int64(om.conf.Consumer.Offsets.Retention / time.Millisecond) + } } om.pomsLock.RLock() diff --git a/offset_manager_test.go b/offset_manager_test.go index def296be6..04322fce7 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -2,6 +2,7 @@ package sarama import ( "errors" + "fmt" "sync/atomic" "testing" "time" @@ -572,3 +573,63 @@ func TestAbortPartitionOffsetManager(t *testing.T) { safeClose(t, om) safeClose(t, testClient) } + +// Validate that the constructRequest() method correctly maps Sarama's default for +// Config.Consumer.Offsets.Retention to the equivalent Kafka value. +func TestConstructRequestRetentionTime(t *testing.T) { + expectedRetention := func(version KafkaVersion, retention time.Duration) int64 { + switch { + case version.IsAtLeast(V2_1_0_0): + // version >= 2.1.0: Client specified retention time isn't supported in the + // offset commit request anymore, thus the retention time field set in the + // OffsetCommitRequest struct should be 0. + return 0 + case version.IsAtLeast(V0_9_0_0): + // 0.9.0 <= version < 2.1.0: Retention time *is* supported in the offset commit + // request. Sarama's default retention times (0) must be mapped to the Kafka + // default (-1). Non-zero Sarama times are converted from time.Duration to + // an int64 millisecond value. + if retention > 0 { + return int64(retention / time.Millisecond) + } else { + return -1 + } + default: + // version < 0.9.0: Client specified retention time is not supported in the offset + // commit request, thus the retention time field set in the OffsetCommitRequest + // struct should be 0. + return 0 + } + } + + for _, version := range SupportedVersions { + for _, retention := range []time.Duration{0, time.Millisecond} { + name := fmt.Sprintf("version %s retention: %s", version, retention) + t.Run(name, func(t *testing.T) { + // Perform necessary setup for calling the constructRequest() method. This + // test-case only cares about the code path that sets the retention time + // field in the returned request struct. + conf := NewTestConfig() + conf.Version = version + conf.Consumer.Offsets.Retention = retention + om := &offsetManager{ + conf: conf, + poms: map[string]map[int32]*partitionOffsetManager{ + "topic": { + 0: { + dirty: true, + }, + }, + }, + } + + req := om.constructRequest() + + expectedRetention := expectedRetention(version, retention) + if req.RetentionTime != expectedRetention { + t.Errorf("expected retention time %d, got: %d", expectedRetention, req.RetentionTime) + } + }) + } + } +}