Skip to content

Commit

Permalink
Add support for custom offset retention durations to offset manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Matvey Arye committed Feb 11, 2016
1 parent 4ba9bba commit 262eef8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ type Config struct {
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64

// The retention duration for committed offsets. If zero, disabled
// (in which case the `offsets.retention.minutes` option on the
// broker will be used). Kafka only supports precision up to
// milliseconds; nanoseconds will be truncated.
// (default is 0: disabled).
Retention time.Duration
}
}

Expand Down Expand Up @@ -257,6 +264,9 @@ func (c *Config) Validate() error {
if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
}
if c.ClientID == "sarama" {
Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
}
Expand Down
19 changes: 15 additions & 4 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() {
}

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
var r *OffsetCommitRequest
if bom.parent.conf.Consumer.Offsets.Retention == 0 {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}

}

for s := range bom.subscriptions {
Expand Down
37 changes: 37 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,43 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour

pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
if req.body.version() != 2 {
t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
}
offsetCommitRequest := req.body.(*OffsetCommitRequest)
if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
}
return ocResponse
}
coordinator.setHandler(handler)

pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
Expand Down

0 comments on commit 262eef8

Please sign in to comment.