Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages and errors directly in consumer mock #555

Merged
merged 2 commits into from
Oct 28, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 20 additions & 39 deletions mocks/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, offset int64)
}

pc.consumed = true
go pc.handleExpectations()
return pc, nil
}

Expand Down Expand Up @@ -141,13 +140,12 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset

if c.partitionConsumers[topic][partition] == nil {
c.partitionConsumers[topic][partition] = &PartitionConsumer{
t: c.t,
topic: topic,
partition: partition,
offset: offset,
expectations: make(chan *consumerExpectation, 1000),
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
t: c.t,
topic: topic,
partition: partition,
offset: offset,
messages: make(chan *sarama.ConsumerMessage, c.config.ChannelBufferSize),
errors: make(chan *sarama.ConsumerError, c.config.ChannelBufferSize),
}
}

Expand All @@ -164,12 +162,10 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset
// Errors and Messages channel, you should specify what values will be provided on these
// channels using YieldMessage and YieldError.
type PartitionConsumer struct {
l sync.Mutex
t ErrorReporter
topic string
partition int32
offset int64
expectations chan *consumerExpectation
messages chan *sarama.ConsumerMessage
errors chan *sarama.ConsumerError
singleClose sync.Once
Expand All @@ -179,40 +175,15 @@ type PartitionConsumer struct {
highWaterMarkOffset int64
}

func (pc *PartitionConsumer) handleExpectations() {
pc.l.Lock()
defer pc.l.Unlock()

for ex := range pc.expectations {
if ex.Err != nil {
pc.errors <- &sarama.ConsumerError{
Topic: pc.topic,
Partition: pc.partition,
Err: ex.Err,
}
} else {
atomic.AddInt64(&pc.highWaterMarkOffset, 1)

ex.Msg.Topic = pc.topic
ex.Msg.Partition = pc.partition
ex.Msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- ex.Msg
}
}

close(pc.messages)
close(pc.errors)
}

///////////////////////////////////////////////////
// PartitionConsumer interface implementation
///////////////////////////////////////////////////

// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface.
func (pc *PartitionConsumer) AsyncClose() {
pc.singleClose.Do(func() {
close(pc.expectations)
close(pc.messages)
close(pc.errors)
})
}

Expand Down Expand Up @@ -289,7 +260,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
// verify that the channel is empty on close.
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
pc.expectations <- &consumerExpectation{Msg: msg}
atomic.AddInt64(&pc.highWaterMarkOffset, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two concurrent calls to this method can still interleave, resulting in incorrect msg.Offset values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that lock/unlock should be added here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is necessary? atomic.AddInt64 returns the new value, so you could collapse the atomic calls and reduce the window, but it would still be possible to interleave method calls such that messages ended up in the channel with the wrong order (e.g. offsets 1 2 3 4 instead of 1 2 3 4). A lock seems the obvious way to force the highWaterMarkOffset change to occur synchronously with the channel write.

Note that the variable will still need to be edited atomically in order to avoid locking HighWaterMarkOffset() too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add it in, this is already merged... will it still work on this branch, or should I do another?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another branch is probably simpler


msg.Topic = pc.topic
msg.Partition = pc.partition
msg.Offset = atomic.LoadInt64(&pc.highWaterMarkOffset)

pc.messages <- msg
}

// YieldError will yield an error on the Errors channel of this partition consumer
Expand All @@ -298,7 +275,11 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
// not to happen. You can call ExpectErrorsDrainedOnClose so it will verify that
// the channel is empty on close.
func (pc *PartitionConsumer) YieldError(err error) {
pc.expectations <- &consumerExpectation{Err: err}
pc.errors <- &sarama.ConsumerError{
Topic: pc.topic,
Partition: pc.partition,
Err: err,
}
}

// ExpectMessagesDrainedOnClose sets an expectation on the partition consumer
Expand Down