Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.2.0] Event ordering [API-1102] #696

Merged
merged 38 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f40f3fe
remove unused field
utku-caglayan Dec 5, 2021
134b168
fix typo
utku-caglayan Dec 5, 2021
e604b7e
Initial impl stripe executor impl:
utku-caglayan Dec 5, 2021
33e3d78
handle executor start and stop
utku-caglayan Dec 5, 2021
ac232b7
handle partitionID -1
utku-caglayan Dec 6, 2021
2563ce8
fix field alignment issues
utku-caglayan Dec 6, 2021
b546c1d
introduce "noSpecificPartition" variable to improve readability
utku-caglayan Dec 21, 2021
c4dc047
make queue size and worker count configurable
utku-caglayan Dec 21, 2021
946ba9c
Merge branch 'master' into event-ordering
utku-caglayan Dec 21, 2021
eff8e8f
temporary fix for memory leak test
utku-caglayan Dec 21, 2021
22155c9
decrease queue capacity, and document the differences
utku-caglayan Dec 21, 2021
2f91eed
decrease queue capacity, and document the differences
utku-caglayan Dec 22, 2021
16dd0b2
remove event executor configuration
utku-caglayan Dec 24, 2021
7ae872c
address pr reviews for stripe executor
utku-caglayan Dec 24, 2021
158c62e
fix failing "TestMarshalDefaultConfig" test
utku-caglayan Dec 24, 2021
3491021
fix failing test
utku-caglayan Dec 24, 2021
c1efb0c
rename "ind" to "i"
utku-caglayan Dec 24, 2021
3e90236
apply review suggestions
utku-caglayan Dec 29, 2021
b574499
add map event order test
utku-caglayan Dec 29, 2021
e51f5d3
increase TestClientStartShutdownMemoryLeak memory limit
utku-caglayan Dec 29, 2021
9011983
refactor TestClientEventHandlingOrder to group events by partitionID
utku-caglayan Dec 29, 2021
f7df88d
refactor int32 to int
utku-caglayan Dec 29, 2021
03fe449
refactor TestClientEventHandlingOrder logic
utku-caglayan Dec 29, 2021
1c7a575
change behavior to not block on "dispatch" if event queue is full
utku-caglayan Dec 29, 2021
2b65d5a
remove unused func
utku-caglayan Dec 29, 2021
3bb2d0f
refactor return of "dispatch"
utku-caglayan Dec 30, 2021
ab26f81
undo older bad merge
utku-caglayan Dec 30, 2021
6faa426
fix test to start partitionIDs from 0
utku-caglayan Dec 30, 2021
db4dcd6
improve tests
utku-caglayan Dec 30, 2021
99b7971
add dispatch zero&negative key test
utku-caglayan Dec 30, 2021
67d42e5
refactor constructor to raise panic on invalid conf
utku-caglayan Dec 30, 2021
4d0c38c
refactor test for stability
utku-caglayan Dec 30, 2021
f8f778f
refactor stripeExecutor to pointer semantics
utku-caglayan Dec 30, 2021
50b5bd5
minor improvement
utku-caglayan Dec 30, 2021
feab9ed
fix major bug
utku-caglayan Dec 30, 2021
df8e738
implement a blackbox event order test that fails on previous event or…
utku-caglayan Dec 30, 2021
5fadb18
rename a helper func
utku-caglayan Dec 30, 2021
76ee128
refactor struct for padding
utku-caglayan Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions internal/cluster/connection_listener_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type ConnectionListenerBinder struct {
regs map[types.UUID]listenerRegistration
correlationIDs map[types.UUID][]int64
subscriptionToMembers map[types.UUID]map[types.UUID]struct{}
memberSubscriptions map[types.UUID][]types.UUID
regsMu *sync.RWMutex
connectionCount int32
smart bool
Expand All @@ -67,7 +66,6 @@ func NewConnectionListenerBinder(
regs: map[types.UUID]listenerRegistration{},
correlationIDs: map[types.UUID][]int64{},
subscriptionToMembers: map[types.UUID]map[types.UUID]struct{}{},
memberSubscriptions: map[types.UUID][]types.UUID{},
regsMu: &sync.RWMutex{},
logger: logger,
smart: smart,
Expand Down Expand Up @@ -123,9 +121,6 @@ func (b *ConnectionListenerBinder) Remove(ctx context.Context, id types.UUID) er
b.logger.Trace(func() string {
return fmt.Sprintf("removing listener %s:\nconns: %v,\nregs: %v", id, conns, b.regs)
})
for _, conn := range conns {
b.removeMemberSubscriptions(conn.memberUUID)
}
return b.sendRemoveListenerRequests(ctx, reg.removeRequest, conns...)
}

Expand Down Expand Up @@ -269,9 +264,6 @@ func (b *ConnectionListenerBinder) handleConnectionOpened(e *ConnectionStateChan

func (b *ConnectionListenerBinder) handleConnectionClosed(e *ConnectionStateChangedEvent) {
atomic.AddInt32(&b.connectionCount, -1)
b.regsMu.Lock()
b.removeMemberSubscriptions(e.Conn.memberUUID)
b.regsMu.Unlock()
}

func (b *ConnectionListenerBinder) connExists(conn *Connection, subID types.UUID) bool {
Expand All @@ -291,17 +283,4 @@ func (b *ConnectionListenerBinder) addSubscriptionToMember(subID types.UUID, mem
b.subscriptionToMembers[subID] = mems
}
mems[memberUUID] = struct{}{}
b.memberSubscriptions[memberUUID] = append(b.memberSubscriptions[memberUUID], subID)
}

func (b *ConnectionListenerBinder) removeMemberSubscriptions(memberUUID types.UUID) {
// this method should be called under lock
subs, found := b.memberSubscriptions[memberUUID]
if !found {
return
}
for _, sub := range subs {
delete(b.subscriptionToMembers, sub)
}
delete(b.memberSubscriptions, memberUUID)
}
31 changes: 22 additions & 9 deletions internal/invocation/invocation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@ type Handler interface {
}

type Service struct {
handler Handler
logger ilogger.Logger
requestCh chan Invocation
urgentRequestCh chan Invocation
responseCh chan *proto.ClientMessage
// removeCh carries correlationIDs to be removed
removeCh chan int64
doneCh chan struct{}
groupLostCh chan *GroupLostEvent
invocations map[int64]Invocation
handler Handler
urgentRequestCh chan Invocation
eventDispatcher *event.DispatchService
logger ilogger.Logger
state int32
// removeCh carries correlationIDs to be removed
removeCh chan int64
executor stripeExecutor
state int32
}

func NewService(
handler Handler,
eventDispacher *event.DispatchService,
eventDispatcher *event.DispatchService,
logger ilogger.Logger) *Service {
s := &Service{
requestCh: make(chan Invocation),
Expand All @@ -68,15 +69,17 @@ func NewService(
groupLostCh: make(chan *GroupLostEvent),
invocations: map[int64]Invocation{},
handler: handler,
eventDispatcher: eventDispacher,
eventDispatcher: eventDispatcher,
logger: logger,
state: ready,
executor: newStripeExecutor(5, 100),
emreyigit marked this conversation as resolved.
Show resolved Hide resolved
}
s.eventDispatcher.Subscribe(EventGroupLost, serviceSubID, func(event event.Event) {
go func() {
s.groupLostCh <- event.(*GroupLostEvent)
}()
})
s.executor.start()
go s.processIncoming()
return s
}
Expand All @@ -85,6 +88,7 @@ func (s *Service) Stop() {
if !atomic.CompareAndSwapInt32(&s.state, ready, stopped) {
return
}
s.executor.stop()
close(s.doneCh)
}

Expand Down Expand Up @@ -183,7 +187,16 @@ func (s *Service) handleClientMessage(msg *proto.ClientMessage) {
return fmt.Sprintf("invocation with unknown correlation ID: %d", correlationID)
})
} else if inv.EventHandler() != nil {
go inv.EventHandler()(msg)
handler := func() {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
inv.EventHandler()(msg)
}
if inv.PartitionID() == -1 {
emreyigit marked this conversation as resolved.
Show resolved Hide resolved
// Execute on a random worker
s.executor.dispatchRandom(handler)
return
}
partitionID := uint32(inv.PartitionID())
s.executor.dispatch(partitionID, handler)
yuce marked this conversation as resolved.
Show resolved Hide resolved
}
return
}
Expand Down
73 changes: 73 additions & 0 deletions internal/invocation/stripe_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package invocation

import (
"math/rand"
"sync"
)

// stripeExecutor executes given "tasks" preserving the order among the ones
// that are given with the same key
type stripeExecutor struct {
quit chan struct{}
wg *sync.WaitGroup
executeFunction func(queue chan func(), quit chan struct{}, wg *sync.WaitGroup)
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
tasks []chan func()
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
queueCount uint32
}

// newStripeExecutor returns a new stripeExecutor with configured queueCount and queueSize
func newStripeExecutor(queueCount, queueSize uint32) stripeExecutor {
se := stripeExecutor{
tasks: make([]chan func(), queueCount),
queueCount: queueCount,
}
for ind := range se.tasks {
se.tasks[ind] = make(chan func(), queueSize)
}
se.quit = make(chan struct{})
se.wg = &sync.WaitGroup{}
se.executeFunction = defaultExecuteFnc
return se
}

// start fires up the workers for each queue
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
func (se stripeExecutor) start() {
se.wg.Add(int(se.queueCount))
for ind := range se.tasks {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
ind := ind
go se.executeFunction(se.tasks[ind], se.quit, se.wg)
}
}

// dispatch sends the handler "task" to the appropriate queue, "tasks"
// with the same key end up on the same queue
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
func (se stripeExecutor) dispatch(key uint32, handler func()) {
se.tasks[key%se.queueCount] <- handler
}

func (se stripeExecutor) dispatchRandom(handler func()) {
key := rand.Int31n(int32(se.queueCount))
se.dispatch(uint32(key), handler)
}

// stop blocks until all workers are stopped.
func (se stripeExecutor) stop() {
close(se.quit)
se.wg.Wait()
}

func (se stripeExecutor) setExecutorFnc(custom func(queue chan func(), quit chan struct{}, wg *sync.WaitGroup)) {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
se.executeFunction = custom
}

func defaultExecuteFnc(queue chan func(), quit chan struct{}, wg *sync.WaitGroup) {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
for {
select {
case task := <-queue:
task()
case <-quit:
return
yuce marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
139 changes: 139 additions & 0 deletions internal/invocation/stripe_executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package invocation

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)

type orderChecker struct {
*testing.T
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
previousCallArg int
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
}

func (oc *orderChecker) call(arg int) {
assert.Equal(oc.T, oc.previousCallArg+1, arg, "order of the tasks is not preserved")
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
oc.previousCallArg = arg
}

func Test_defaultExecuteFnc(t *testing.T) {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
oc := &orderChecker{T: t}
tasks := make(chan func(), 3)
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
for i := 1; i < 4; i++ {
tmp := i
tasks <- func() {
oc.call(tmp)
yuce marked this conversation as resolved.
Show resolved Hide resolved
}
}
quit := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go defaultExecuteFnc(tasks, quit, &wg)
assert.Eventually(t, func() bool {
return oc.previousCallArg == 3
}, time.Second, time.Millisecond*200, "execute function could not finish the tasks")
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
close(quit)
go func() {
wg.Wait()
// Just to see if goroutine finished
oc.previousCallArg = 10
}()
assert.Eventually(t, func() bool {
return oc.previousCallArg == 10
}, time.Second, time.Millisecond*200, "execute function did not notify about its finish")
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
}

func Test_serialExecutor_dispatch(t *testing.T) {
tests := []struct {
queueCount uint32
key uint32
expectedIndex int32
}{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be a test case for key == -1 as well? If it's hard to do that in this function due to random stuff, at the very least we should have a separate test that checks dispatch works with negative integers too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! Created a separate test for that at 99b7971

{
queueCount: 4,
key: 2,
expectedIndex: 2,
},
{
queueCount: 2,
key: 2,
expectedIndex: 0,
},
{
queueCount: 2,
key: 3,
expectedIndex: 1,
},
{
queueCount: 2,
key: 4,
expectedIndex: 0,
},
{
queueCount: 1,
key: 5,
expectedIndex: 0,
},
}
for ind, tt := range tests {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
t.Run(fmt.Sprintf("QueueCount: %d, Key: %d", tt.queueCount, tt.key), func(t *testing.T) {
se := newStripeExecutor(tt.queueCount, 0)
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
tmpHandler := func() {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
panic(ind)
}
go se.dispatch(tt.key, tmpHandler)
select {
case <-se.tasks[tt.expectedIndex]:
case <-time.After(time.Second):
assert.FailNow(t, "dispatcher did not dispatch to correct queue")
}
})
}
}

func Test_serialExecutor_start(t *testing.T) {
t.Logf("enabled leak check")
defer goleak.VerifyNone(t)
t.Run("Functionality test", func(t *testing.T) {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
var orderCheckers []*orderChecker
type pair struct {
handler func()
key uint32
}
// create orderCheckers, index corresponding to key
for i := 1; i <= 100; i++ {
orderCheckers = append(orderCheckers, &orderChecker{T: t})
}
// populate task queues
// assume we have orderCheckers a,b,c, we will have
// a1,b1,c1,b2,c2,a2,a3,c3,b3
var tasks []pair
for i := 1; i <= 3; i++ {
tmp := i
for _, perm := range rand.Perm(100) {
key := perm
tasks = append(tasks, pair{key: uint32(key), handler: func() {
orderCheckers[key].call(tmp)
}})
}
}

se := newStripeExecutor(3, 3)
se.start()
go func() {
for _, task := range tasks {
se.dispatch(task.key, task.handler)
}
}()
time.Sleep(time.Second * 1)
for _, oc := range orderCheckers {
assert.Equal(t, 3, oc.previousCallArg, "task did not complete")
}
se.stop()
})
}