Skip to content

Commit

Permalink
Merge pull request #431 from Shopify/consumer_topics_partitions
Browse files Browse the repository at this point in the history
Add Topics() and Partitions() to Consumer interface, and mock Consumer implementation.
  • Loading branch information
wvanbergen committed Apr 20, 2015
2 parents 4b2a9ef + 2052bd9 commit 3534171
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 1 deletion.
17 changes: 17 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func (ce ConsumerErrors) Error() string {
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
type Consumer interface {

// Topics returns the set of available topics as retrieved from the cluster metadata.
// This method is the same as Client.Topics(), and is provided for convenience.
Topics() ([]string, error)

// Partitions returns the sorted list of all partition IDs for the given topic.
// This method is the same as Client.Pertitions(), and is provided for convenience.
Partitions(topic string) ([]int32, error)

// ConsumePartition creates a PartitionConsumer on the given topic/partition with the given offset. It will
// return an error if this Consumer is already consuming on the given topic/partition. Offset can be a
// literal offset, or OffsetNewest or OffsetOldest
Expand Down Expand Up @@ -98,6 +107,14 @@ func (c *consumer) Close() error {
return nil
}

func (c *consumer) Topics() ([]string, error) {
return c.client.Topics()
}

func (c *consumer) Partitions(topic string) ([]int32, error) {
return c.client.Partitions(topic)
}

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
child := &partitionConsumer{
consumer: c,
Expand Down
43 changes: 43 additions & 0 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Consumer struct {
t ErrorReporter
config *sarama.Config
partitionConsumers map[string]map[int32]*PartitionConsumer
metadata map[string][]int32
}

// NewConsumer returns a new mock Consumer instance. The t argument should
Expand Down Expand Up @@ -62,6 +63,39 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
return pc, nil
}

// Topics returns a list of topics, as registered with SetMetadata
func (c *Consumer) Topics() ([]string, error) {
c.l.Lock()
defer c.l.Unlock()

if c.metadata == nil {
c.t.Errorf("Unexpected call to Topics. Initialize the mock's topic metadata with SetMetadata.")
return nil, sarama.ErrOutOfBrokers
}

var result []string
for topic, _ := range c.metadata {
result = append(result, topic)
}
return result, nil
}

// Partitions returns the list of parititons for the given topic, as registered with SetMetadata
func (c *Consumer) Partitions(topic string) ([]int32, error) {
c.l.Lock()
defer c.l.Unlock()

if c.metadata == nil {
c.t.Errorf("Unexpected call to Partitions. Initialize the mock's topic metadata with SetMetadata.")
return nil, sarama.ErrOutOfBrokers
}
if c.metadata[topic] == nil {
return nil, sarama.ErrUnknownTopicOrPartition
}

return c.metadata[topic], nil
}

// Close implements the Close method from the sarama.Consumer interface. It will close
// all registered PartitionConsumer instances.
func (c *Consumer) Close() error {
Expand All @@ -81,6 +115,15 @@ func (c *Consumer) Close() error {
// Expectation API
///////////////////////////////////////////////////

// SetMetadata sets the clusters topic/partition metadata,
// which will be returned by Topics() and Partitions().
func (c *Consumer) SetTopicMetadata(metadata map[string][]int32) {
c.l.Lock()
defer c.l.Unlock()

c.metadata = metadata
}

// ExpectConsumePartition will register a topic/partition, so you can set expectations on it.
// The registered PartitionConsumer will be returned, so you can set expectations
// on it using method chanining. Once a topic/partition is registered, you are
Expand Down
59 changes: 58 additions & 1 deletion mocks/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mocks

import (
"sort"
"testing"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -187,6 +188,62 @@ func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
}

if len(trm.errors) != 0 {
t.Errorf("Expected ano expectation failures to be set on the error reporter.")
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}

func TestConsumerTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)

consumer.SetTopicMetadata(map[string][]int32{
"test1": []int32{0, 1, 2, 3},
"test2": []int32{0, 1, 2, 3, 4, 5, 6, 7},
})

topics, err := consumer.Topics()
if err != nil {
t.Error(t)
}

sortedTopics := sort.StringSlice(topics)
sortedTopics.Sort()
if len(sortedTopics) != 2 || sortedTopics[0] != "test1" || sortedTopics[1] != "test2" {
t.Error("Unexpected topics returned:", sortedTopics)
}

partitions1, err := consumer.Partitions("test1")
if err != nil {
t.Error(t)
}

if len(partitions1) != 4 {
t.Error("Unexpected partitions returned:", len(partitions1))
}

partitions2, err := consumer.Partitions("test2")
if err != nil {
t.Error(t)
}

if len(partitions2) != 8 {
t.Error("Unexpected partitions returned:", len(partitions2))
}

if len(trm.errors) != 0 {
t.Errorf("Expected no expectation failures to be set on the error reporter.")
}
}

func TestConsumerUnexpectedTopicMetadata(t *testing.T) {
trm := newTestReporterMock()
consumer := NewConsumer(trm, nil)

if _, err := consumer.Topics(); err != sarama.ErrOutOfBrokers {
t.Error("Expected sarama.ErrOutOfBrokers, found", err)
}

if len(trm.errors) != 1 {
t.Errorf("Expected an expectation failure to be set on the error reporter.")
}
}

0 comments on commit 3534171

Please sign in to comment.