Skip to content

Commit

Permalink
Merge pull request #1112 from evan-stripe/extended-partitioner
Browse files Browse the repository at this point in the history
Make Partitioner.RequiresConsistency vary per-message
  • Loading branch information
eapache authored Jun 14, 2018
2 parents 05ded73 + 9b2c4d6 commit bc4e274
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 1 deletion.
9 changes: 8 additions & 1 deletion async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,14 @@ func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
var partitions []int32

err := tp.breaker.Run(func() (err error) {
if tp.partitioner.RequiresConsistency() {
var requiresConsistency = false
if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
requiresConsistency = ep.MessageRequiresConsistency(msg)
} else {
requiresConsistency = tp.partitioner.RequiresConsistency()
}

if requiresConsistency {
partitions, err = tp.parent.client.Partitions(msg.Topic)
} else {
partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
Expand Down
19 changes: 19 additions & 0 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@ type Partitioner interface {
RequiresConsistency() bool
}

// DynamicConsistencyPartitioner can optionally be implemented by Partitioners
// in order to allow more flexibility than is originally allowed by the
// RequiresConsistency method in the Partitioner interface. This allows
// partitioners to require consistency sometimes, but not all times. It's useful
// for, e.g., the HashPartitioner, which does not require consistency if the
// message key is nil.
type DynamicConsistencyPartitioner interface {
Partitioner

// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
// but takes in the message being partitioned so that the partitioner can
// make a per-message determination.
MessageRequiresConsistency(message *ProducerMessage) bool
}

// PartitionerConstructor is the type for a function capable of constructing new Partitioners.
type PartitionerConstructor func(topic string) Partitioner

Expand Down Expand Up @@ -133,3 +148,7 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
func (p *hashPartitioner) RequiresConsistency() bool {
return true
}

func (p *hashPartitioner) MessageRequiresConsistency(message *ProducerMessage) bool {
return message.Key != nil
}
18 changes: 18 additions & 0 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ func TestHashPartitioner(t *testing.T) {
}
}

func TestHashPartitionerConsistency(t *testing.T) {
partitioner := NewHashPartitioner("mytopic")
ep, ok := partitioner.(DynamicConsistencyPartitioner)

if !ok {
t.Error("Hash partitioner does not implement DynamicConsistencyPartitioner")
}

consistency := ep.MessageRequiresConsistency(&ProducerMessage{Key: StringEncoder("hi")})
if !consistency {
t.Error("Messages with keys should require consistency")
}
consistency = ep.MessageRequiresConsistency(&ProducerMessage{})
if consistency {
t.Error("Messages without keys should require consistency")
}
}

func TestHashPartitionerMinInt32(t *testing.T) {
partitioner := NewHashPartitioner("mytopic")

Expand Down

0 comments on commit bc4e274

Please sign in to comment.