Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send messages and errors directly in consumer mock #555

Merged
merged 2 commits into from
Oct 28, 2015

Conversation

aaronkavlie-wf
Copy link
Contributor

The mock PartitionConsumer sent messages to an expectations channel
on calls to YieldMessage and YieldError. This led to race conditions
in tests if pc.Messages was called before the handleExpectations
goroutine added a message to the channel.

This PR sends message and errors to their respective channels
directly. The expectations channel & goroutine are removed
altogether, simplifying the code.

This resolves issue #553.

@eapache @wvanbergen

The mock PartitionConsumer sent messages to an expectations channel
on calls to YieldMessage and YieldError. This led to race conditions
in tests if pc.Messages was called before the handleExpectations
goroutine added a message to the channel.

Messages and errors are now sent to their respective channels
directly, and the expectations channel & goroutine are removed
altogether.
@@ -289,7 +261,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
// verify that the channel is empty on close.
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
pc.expectations <- &consumerExpectation{Msg: msg}
pc.highWaterMarkOffset += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this still need to be atomic and/or locked somehow? I guess it depends on if people expect to be able to call this method multiple times concurrently, but I don't know if that's true (it was safe to call the old one concurrently).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's atomic in sarama, so I prefer the mock to be atomic as well to match the sarama API as well as possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the atomic operations because it's no longer executed in a goroutine... then again, handleExpectations uses a for loop anyway, so that alone would not lead to race conditions. So I guess it's not totally clear to me why they were there.

If atomic operations are needed in case someone calls code concurrently, that means that plain assignments are never safe in public APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In sarama it's atomic, because it's possible to have the processing lag monitoring in a separate goroutine. So it's not so much about internal use inside the mock, but more about how the API is used.

@eapache
Copy link
Contributor

eapache commented Oct 28, 2015

One thought, but generally LGTM.

@wvanbergen
Copy link
Contributor

👍, just need non-atomic increment needs fixing.

@aaronkavlie-wf
Copy link
Contributor Author

switched back to the atomic operations.

wvanbergen added a commit that referenced this pull request Oct 28, 2015
Send messages and errors directly in consumer mock
@wvanbergen wvanbergen merged commit b81dd26 into IBM:master Oct 28, 2015
@wvanbergen
Copy link
Contributor

Thanks!

@@ -289,7 +260,13 @@ func (pc *PartitionConsumer) HighWaterMarkOffset() int64 {
// reasons forthis not to happen. ou can call ExpectMessagesDrainedOnClose so it will
// verify that the channel is empty on close.
func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) {
pc.expectations <- &consumerExpectation{Msg: msg}
atomic.AddInt64(&pc.highWaterMarkOffset, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two concurrent calls to this method can still interleave, resulting in incorrect msg.Offset values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that lock/unlock should be added here too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is necessary? atomic.AddInt64 returns the new value, so you could collapse the atomic calls and reduce the window, but it would still be possible to interleave method calls such that messages ended up in the channel with the wrong order (e.g. offsets 1 2 3 4 instead of 1 2 3 4). A lock seems the obvious way to force the highWaterMarkOffset change to occur synchronously with the channel write.

Note that the variable will still need to be edited atomically in order to avoid locking HighWaterMarkOffset() too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add it in, this is already merged... will it still work on this branch, or should I do another?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another branch is probably simpler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants