-
Notifications
You must be signed in to change notification settings - Fork 60
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[1.2.0] Event ordering [API-1102] #696
[1.2.0] Event ordering [API-1102] #696
Conversation
Preserve order of execution of events handler which originates from the same partition
Codecov Report
@@ Coverage Diff @@
## master #696 +/- ##
==========================================
- Coverage 74.37% 72.17% -2.21%
==========================================
Files 311 330 +19
Lines 13571 15214 +1643
==========================================
+ Hits 10094 10981 +887
- Misses 2739 3458 +719
- Partials 738 775 +37
Continue to review full report at Codecov.
|
I think we should add more details to PRs. We experienced similar situation recently at .net side. @ihsandemir suggested it. Otherwise, reviewing becomes frustrating. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's some initial comments, will continue to review.
c3dd483
to
c1efb0c
Compare
} | ||
|
||
// dispatch sends the handler "task" to one of the appropriate taskQueues, "tasks" with the same key end up on the same queue. Returns true if queue is full and could not dispatch | ||
func (se stripeExecutor) dispatch(key int, task func()) (queueFull bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function should return false
when the task cannot be dispatched. That could also help getting rid of the return variable name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is definitely more idiomatic thanks, addressed it at 3bb2d0f
e38029e
to
3bb2d0f
Compare
client_it_test.go
Outdated
@@ -140,6 +143,64 @@ func TestClientMemberEvents(t *testing.T) { | |||
}) | |||
} | |||
|
|||
func calcPartitionID(ss *serialization.Service, key interface{}) (int32, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this function may be named better. Maybe calculatePartitionID
as the last resort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly, could not come up with a better one. Renamed at 5fadb18
} | ||
|
||
// newStripeExecutor returns a new stripeExecutor with configured queueCount and queueSize. | ||
func newStripeExecutorWithConfig(queueCount, queueSize int) (stripeExecutor, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about returning only the stripeExecutor? error
is always nil. We can update it if in the future we need to return an error. Also, should it return *stripeExecutor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got rid off error at 67d42e5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored stripeExecutor to pointer semantics as you suggested at f8f778f
queueCount int | ||
key int | ||
expectedIndex int | ||
}{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a test case for key == -1
as well? If it's hard to do that in this function due to random stuff, at the very least we should have a separate test that checks dispatch
works with negative integers too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea! Created a separate test for that at 99b7971
go func() { | ||
for _, task := range tasks { | ||
if ok := se.dispatch(task.key, task.handler); !ok { | ||
panic("could not dispatch event handler") | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you refactor this, so each task is dispatched by a different goroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List of tasks is created in a way that randomizes tasks but respect the order of them with respect to their keys, just like the situation on clusters. If we dispatch each one in a different goroutine, we cannot guarantee the order among the tasks of the same key.
for i := 1; i <= 1000; i++ { | ||
it.MustValue(m.Put(ctx, i, "test")) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you refactor this so m.Put
s run concurrently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead implemented a blackbox one as we discussed df8e738
Use partition id of event ClientMessage, instead of stored "AddListener" invocation partition id which is 0 since it is not addressed to a specific partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good!
Fixes #692
Hazelcast Cluster members, respects the order of the events. Different order guarantees are given for different event types (see docs for detail)
Java Client summarizes these guarantees as "Order of the events originated from the same partition is preserved". For example events which belong to the same key of a map are received(on the clients) in the same order as they created. This is achieved via StripeExecutor implementation which is also mentioned in the link above.
Java Client processes incoming events on a StripedExecutor, by treating event handlers as "stripedRunnable"s with respect to their partitionID. Please check it for reference.
This PR fixes event processing (execution of event-handlers) logic by respecting the order of events from the same partition.