Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OffsetManager Implementation #461

Merged
merged 24 commits into from
Aug 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e0c0da8
First draft implementation of an offset manager
eapache May 7, 2015
90ba479
Track dirty flag in partition-offset-manager
eapache Aug 10, 2015
e7d8e17
Don't shutdown until the latest offset was successfully committed
eapache Aug 10, 2015
9e2380c
Don't start setting until the initial get succeeds
eapache Aug 13, 2015
b4fb5b7
Only add dirty subscriptions to the request
eapache Aug 15, 2015
9d4ca02
Fix abort()
eapache Aug 15, 2015
e98389a
Fix some error return codes
aaronkavlie-wf Aug 15, 2015
84abfe4
Skip clean subscriptions when parsing responses
eapache Aug 17, 2015
1d96533
Start offset manager tests
aaronkavlie-wf Aug 11, 2015
5f0a997
Test handling of commit errors when an offset is set
aaronkavlie-wf Aug 12, 2015
71aee66
Test of one fetchInitialOffset error
aaronkavlie-wf Aug 12, 2015
67f88f7
Break out repeated code to separate init function
aaronkavlie-wf Aug 13, 2015
132f582
Tests of error responses in fetchInitialOffset
aaronkavlie-wf Aug 13, 2015
147e2fe
Test to covert abort process.
aaronkavlie-wf Aug 13, 2015
50afd11
Close open connections; refine and clean up code.
aaronkavlie-wf Aug 14, 2015
2ad9fa7
Complete coverage of flushToBroker()
aaronkavlie-wf Aug 17, 2015
33f2f9f
Misc. small changes to address PR feedback
aaronkavlie-wf Aug 17, 2015
1e19f5c
Improve godoc of PartitionOffsetManager interface
wvanbergen Aug 19, 2015
1e8c4f2
Add Close() error to OffsetManager interface
wvanbergen Aug 19, 2015
de8e312
Merge pull request #518 from Shopify/offset-manager-close
wvanbergen Aug 19, 2015
5109f97
OffsetManager: make initial offset configurable
wvanbergen Aug 19, 2015
e8c3c16
Add a functional test for the offset manager.
wvanbergen Aug 21, 2015
0ae0505
Deregister a partition offset manager in the OffsetManager when it is…
wvanbergen Aug 21, 2015
4c0d6fc
Merge pull request #520 from Shopify/offset-manager-initial
wvanbergen Aug 21, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ type Config struct {
// If enabled, any errors that occured while consuming are returned on the Errors channel (default disabled).
Errors bool
}

// Offsets specifies configuration for how and when to commit consumed offsets. This currently requires the
// manual use of an OffsetManager but will eventually be automated.
Offsets struct {
// How frequently to commit updated offsets. Defaults to 10s.
CommitInterval time.Duration

// The initial offset to use if no offset was previously committed. Should be OffsetNewest or OffsetOldest.
// Defaults to OffsetNewest.
Initial int64
}
}

// A user-provided string sent with every request to the brokers for logging, debugging, and auditing purposes.
Expand Down Expand Up @@ -164,6 +175,8 @@ func NewConfig() *Config {
c.Consumer.MaxWaitTime = 250 * time.Millisecond
c.Consumer.MaxProcessingTime = 100 * time.Millisecond
c.Consumer.Return.Errors = false
c.Consumer.Offsets.CommitInterval = 10 * time.Second
c.Consumer.Offsets.Initial = OffsetNewest

c.ChannelBufferSize = 256

Expand Down Expand Up @@ -263,6 +276,11 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.MaxProcessingTime must be > 0")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
case c.Consumer.Offsets.CommitInterval <= 0:
return ConfigurationError("Consumer.Offsets.CommitInterval must be > 0")
case c.Consumer.Offsets.Initial != OffsetOldest && c.Consumer.Offsets.Initial != OffsetNewest:
return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")

}

// validate misc shared values
Expand Down
51 changes: 51 additions & 0 deletions functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package sarama

import (
"testing"
)

func TestFuncOffsetManager(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
if err != nil {
t.Fatal(err)
}

offsetManager, err := NewOffsetManagerFromClient("sarama.TestFuncOffsetManager", client)
if err != nil {
t.Fatal(err)
}

if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
}

pom1, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

pom1.MarkOffset(10, "test metadata")
safeClose(t, pom1)

pom2, err := offsetManager.ManagePartition("test.1", 0)
if err != nil {
t.Fatal(err)
}

offset, metadata := pom2.NextOffset()

if offset != 10+1 {
t.Errorf("Expected the next offset to be 11, found %d.", offset)
}
if metadata != "test metadata" {
t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
}

safeClose(t, pom2)
safeClose(t, offsetManager)
safeClose(t, client)
}
2 changes: 1 addition & 1 deletion functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func checkKafkaAvailability(t testing.TB) {
func checkKafkaVersion(t testing.TB, requiredVersion string) {
kafkaVersion := os.Getenv("KAFKA_VERSION")
if kafkaVersion == "" {
t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
} else {
available := parseKafkaVersion(kafkaVersion)
required := parseKafkaVersion(requiredVersion)
Expand Down
Loading