From 262eef8ff475517165a3dfd6dd4133dad3a9cbbd Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Wed, 10 Feb 2016 13:23:44 -0500 Subject: [PATCH] Add support for custom offset retention durations to offset manager --- config.go | 10 ++++++++++ offset_manager.go | 19 +++++++++++++++---- offset_manager_test.go | 37 +++++++++++++++++++++++++++++++++++++ 3 files changed, 62 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index 7cc91e761..308c2bcaf 100644 --- a/config.go +++ b/config.go @@ -183,6 +183,13 @@ type Config struct { // The initial offset to use if no offset was previously committed. // Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest. Initial int64 + + // The retention duration for committed offsets. If zero, disabled + // (in which case the `offsets.retention.minutes` option on the + // broker will be used). Kafka only supports precision up to + // milliseconds; nanoseconds will be truncated. + // (default is 0: disabled). + Retention time.Duration } } @@ -257,6 +264,9 @@ func (c *Config) Validate() error { if c.Consumer.MaxWaitTime%time.Millisecond != 0 { Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.") } + if c.Consumer.Offsets.Retention%time.Millisecond != 0 { + Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.") + } if c.ClientID == "sarama" { Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.") } diff --git a/offset_manager.go b/offset_manager.go index 0c90f7c21..16fa1dacf 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() { } func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest { - r := &OffsetCommitRequest{ - Version: 1, - ConsumerGroup: bom.parent.group, - ConsumerGroupGeneration: GroupGenerationUndefined, + var r *OffsetCommitRequest + if bom.parent.conf.Consumer.Offsets.Retention == 0 { + r = &OffsetCommitRequest{ + Version: 1, + ConsumerGroup: bom.parent.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + } else { + r = &OffsetCommitRequest{ + Version: 2, + RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond), + ConsumerGroup: bom.parent.group, + ConsumerGroupGeneration: GroupGenerationUndefined, + } + } for s := range bom.subscriptions { diff --git a/offset_manager_test.go b/offset_manager_test.go index 00d5fba9e..3bedbea95 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -228,6 +228,43 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) { coordinator.Close() } +func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) { + om, testClient, broker, coordinator := initOffsetManager(t) + testClient.Config().Consumer.Offsets.Retention = time.Hour + + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + handler := func(req *request) (res encoder) { + if req.body.version() != 2 { + t.Errorf("Expected to be using version 2. Actual: %v", req.body.version()) + } + offsetCommitRequest := req.body.(*OffsetCommitRequest) + if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) { + t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime) + } + return ocResponse + } + coordinator.setHandler(handler) + + pom.MarkOffset(100, "modified_meta") + offset, meta := pom.NextOffset() + + if offset != 101 { + t.Errorf("Expected offset 100. Actual: %v", offset) + } + if meta != "modified_meta" { + t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta) + } + + safeClose(t, pom) + safeClose(t, om) + safeClose(t, testClient) + broker.Close() + coordinator.Close() +} + func TestPartitionOffsetManagerCommitErr(t *testing.T) { om, testClient, broker, coordinator := initOffsetManager(t) pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")