Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom offset retention durations to offset manager #602

Merged
merged 1 commit into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ type Config struct {
// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64

// The retention duration for committed offsets. If zero, disabled
// (in which case the `offsets.retention.minutes` option on the
// broker will be used). Kafka only supports precision up to
// milliseconds; nanoseconds will be truncated.
// (default is 0: disabled).
Retention time.Duration
}
}

Expand Down Expand Up @@ -257,6 +264,9 @@ func (c *Config) Validate() error {
if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
}
if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
}
if c.ClientID == "sarama" {
Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
}
Expand Down
19 changes: 15 additions & 4 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() {
}

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
var r *OffsetCommitRequest
if bom.parent.conf.Consumer.Offsets.Retention == 0 {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}

}

for s := range bom.subscriptions {
Expand Down
37 changes: 37 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,43 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't actually test that the value in the request is set correctly. It should do that, or it (the test) should be removed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Is the sleep in here ok?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of a sleep and a coordinator.Returns you should be able to call coordinator.setHandler and validate the request values in the callback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok changed, that does look a lot better.

om, testClient, broker, coordinator := initOffsetManager(t)
testClient.Config().Consumer.Offsets.Retention = time.Hour

pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
handler := func(req *request) (res encoder) {
if req.body.version() != 2 {
t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
}
offsetCommitRequest := req.body.(*OffsetCommitRequest)
if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
}
return ocResponse
}
coordinator.setHandler(handler)

pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
Expand Down