Skip to content

Commit

Permalink
Add support for custom offset retention durations to offset manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Matvey Arye committed Feb 10, 2016
1 parent 4ba9bba commit 233a87d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 12 deletions.
44 changes: 32 additions & 12 deletions offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ type OffsetManager interface {
}

type offsetManager struct {
client Client
conf *Config
group string
client Client
conf *Config
group string
retention time.Duration

lock sync.Mutex
poms map[string]map[int32]*partitionOffsetManager
Expand All @@ -34,17 +35,25 @@ type offsetManager struct {
// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
// It is still necessary to call Close() on the underlying client when finished with the partition manager.
func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
return NewOffsetManagerFromClientWithRetention(group, client, time.Duration(0))
}

// NewOffsetManagerFromClientWithRetention creates a new OffsetManager from the given client with an explict retention duration.
// Offsets are saved for the retention duration, after which they are deleted (defaults to offsets.retention.minutes broker option).
// It is still necessary to call Close() on the underlying client when finished with the partition manager.
func NewOffsetManagerFromClientWithRetention(group string, client Client, retention time.Duration) (OffsetManager, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
}

om := &offsetManager{
client: client,
conf: client.Config(),
group: group,
poms: make(map[string]map[int32]*partitionOffsetManager),
boms: make(map[*Broker]*brokerOffsetManager),
client: client,
conf: client.Config(),
group: group,
retention: retention,
poms: make(map[string]map[int32]*partitionOffsetManager),
boms: make(map[*Broker]*brokerOffsetManager),
}

return om, nil
Expand Down Expand Up @@ -475,10 +484,21 @@ func (bom *brokerOffsetManager) flushToBroker() {
}

func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
r := &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
var r *OffsetCommitRequest
if bom.parent.retention == 0 {
r = &OffsetCommitRequest{
Version: 1,
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}
} else {
r = &OffsetCommitRequest{
Version: 2,
RetentionTime: int64(bom.parent.retention / time.Millisecond),
ConsumerGroup: bom.parent.group,
ConsumerGroupGeneration: GroupGenerationUndefined,
}

}

for s := range bom.subscriptions {
Expand Down
29 changes: 29 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,35 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
coordinator.Close()
}

func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
_, testClient, broker, coordinator := initOffsetManager(t)
om, err := NewOffsetManagerFromClientWithRetention("group", testClient, time.Hour)
if err != nil {
t.Fatal(err)
}
pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")

ocResponse := new(OffsetCommitResponse)
ocResponse.AddError("my_topic", 0, ErrNoError)
coordinator.Returns(ocResponse)

pom.MarkOffset(100, "modified_meta")
offset, meta := pom.NextOffset()

if offset != 101 {
t.Errorf("Expected offset 100. Actual: %v", offset)
}
if meta != "modified_meta" {
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
}

safeClose(t, pom)
safeClose(t, om)
safeClose(t, testClient)
broker.Close()
coordinator.Close()
}

func TestPartitionOffsetManagerCommitErr(t *testing.T) {
om, testClient, broker, coordinator := initOffsetManager(t)
pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")
Expand Down

0 comments on commit 233a87d

Please sign in to comment.