Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.2.0] Event ordering [API-1102] #696

Merged
merged 38 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f40f3fe
remove unused field
utku-caglayan Dec 5, 2021
134b168
fix typo
utku-caglayan Dec 5, 2021
e604b7e
Initial impl stripe executor impl:
utku-caglayan Dec 5, 2021
33e3d78
handle executor start and stop
utku-caglayan Dec 5, 2021
ac232b7
handle partitionID -1
utku-caglayan Dec 6, 2021
2563ce8
fix field alignment issues
utku-caglayan Dec 6, 2021
b546c1d
introduce "noSpecificPartition" variable to improve readability
utku-caglayan Dec 21, 2021
c4dc047
make queue size and worker count configurable
utku-caglayan Dec 21, 2021
946ba9c
Merge branch 'master' into event-ordering
utku-caglayan Dec 21, 2021
eff8e8f
temporary fix for memory leak test
utku-caglayan Dec 21, 2021
22155c9
decrease queue capacity, and document the differences
utku-caglayan Dec 21, 2021
2f91eed
decrease queue capacity, and document the differences
utku-caglayan Dec 22, 2021
16dd0b2
remove event executor configuration
utku-caglayan Dec 24, 2021
7ae872c
address pr reviews for stripe executor
utku-caglayan Dec 24, 2021
158c62e
fix failing "TestMarshalDefaultConfig" test
utku-caglayan Dec 24, 2021
3491021
fix failing test
utku-caglayan Dec 24, 2021
c1efb0c
rename "ind" to "i"
utku-caglayan Dec 24, 2021
3e90236
apply review suggestions
utku-caglayan Dec 29, 2021
b574499
add map event order test
utku-caglayan Dec 29, 2021
e51f5d3
increase TestClientStartShutdownMemoryLeak memory limit
utku-caglayan Dec 29, 2021
9011983
refactor TestClientEventHandlingOrder to group events by partitionID
utku-caglayan Dec 29, 2021
f7df88d
refactor int32 to int
utku-caglayan Dec 29, 2021
03fe449
refactor TestClientEventHandlingOrder logic
utku-caglayan Dec 29, 2021
1c7a575
change behavior to not block on "dispatch" if event queue is full
utku-caglayan Dec 29, 2021
2b65d5a
remove unused func
utku-caglayan Dec 29, 2021
3bb2d0f
refactor return of "dispatch"
utku-caglayan Dec 30, 2021
ab26f81
undo older bad merge
utku-caglayan Dec 30, 2021
6faa426
fix test to start partitionIDs from 0
utku-caglayan Dec 30, 2021
db4dcd6
improve tests
utku-caglayan Dec 30, 2021
99b7971
add dispatch zero&negative key test
utku-caglayan Dec 30, 2021
67d42e5
refactor constructor to raise panic on invalid conf
utku-caglayan Dec 30, 2021
4d0c38c
refactor test for stability
utku-caglayan Dec 30, 2021
f8f778f
refactor stripeExecutor to pointer semantics
utku-caglayan Dec 30, 2021
50b5bd5
minor improvement
utku-caglayan Dec 30, 2021
feab9ed
fix major bug
utku-caglayan Dec 30, 2021
df8e738
implement a blackbox event order test that fails on previous event or…
utku-caglayan Dec 30, 2021
5fadb18
rename a helper func
utku-caglayan Dec 30, 2021
76ee128
refactor struct for padding
utku-caglayan Dec 31, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 113 additions & 3 deletions client_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"log"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"testing"
Expand All @@ -35,7 +36,9 @@ import (
"github.com/hazelcast/hazelcast-go-client/hzerrors"
"github.com/hazelcast/hazelcast-go-client/internal"
"github.com/hazelcast/hazelcast-go-client/internal/it"
"github.com/hazelcast/hazelcast-go-client/internal/murmur"
"github.com/hazelcast/hazelcast-go-client/internal/proxy"
"github.com/hazelcast/hazelcast-go-client/internal/serialization"
"github.com/hazelcast/hazelcast-go-client/logger"
"github.com/hazelcast/hazelcast-go-client/types"
)
Expand Down Expand Up @@ -140,6 +143,115 @@ func TestClientMemberEvents(t *testing.T) {
})
}

func TestClientEventOrder(t *testing.T) {
it.MapTester(t, func(t *testing.T, m *hz.Map) {
ctx := context.Background()
// events should be processed in this order
const (
noPrevEvent = 0
addEvent = 1
removeEvent = 2
)
// populate event order checkers
var checkers []*int32
for i := 0; i < 20; i++ {
var state int32
checkers = append(checkers, &state)
}
// init listener conf
var c hz.MapEntryListenerConfig
c.NotifyEntryAdded(true)
c.NotifyEntryRemoved(true)
var tasks sync.WaitGroup
// add and remove are separate tasks
tasks.Add(len(checkers) * 2)
it.MustValue(m.AddEntryListener(ctx, c, func(e *hz.EntryNotified) {
state := checkers[e.Key.(int64)]
switch e.EventType {
case hz.EntryAdded:
if !atomic.CompareAndSwapInt32(state, noPrevEvent, noPrevEvent) {
panic("order is not preserved")
}
// keep the executor busy, make sure remove event is not processed before this
time.Sleep(500 * time.Millisecond)
if !atomic.CompareAndSwapInt32(state, noPrevEvent, addEvent) {
panic("order is not preserved")
}
tasks.Done()
case hz.EntryRemoved:
if !atomic.CompareAndSwapInt32(state, addEvent, removeEvent) {
panic("order is not preserved")
}
tasks.Done()
}
}))
for i := range checkers {
tmp := i
go func(index int) {
it.MustValue(m.Put(ctx, index, "test"))
it.MustValue(m.Remove(ctx, index))
}(tmp)
}
tasks.Wait()
})
}

func calculatePartitionID(ss *serialization.Service, key interface{}) (int32, error) {
kd, err := ss.ToData(key)
if err != nil {
return 0, err
}
return murmur.HashToIndex(kd.PartitionHash(), 271), nil
}

func TestClientEventHandlingOrder(t *testing.T) {
// Create custom cluster, and client from it
cls := it.StartNewClusterWithOptions("event-order-test-cluster", 15701, it.MemberCount())
defer cls.Shutdown()
conf := cls.DefaultConfig()
ctx := context.Background()
c := it.MustValue(hz.StartNewClientWithConfig(ctx, conf)).(*hz.Client)
defer c.Shutdown(ctx)
ss := it.MustValue(serialization.NewService(&conf.Serialization)).(*serialization.Service)
// Create test map
m := it.MustValue(c.GetMap(ctx, "TestClientEventHandlingOrder")).(*hz.Map)
var lc hz.MapEntryListenerConfig
lc.NotifyEntryAdded(true)
var (
// have 271 partitions by default
partitionToEvent = make([][]int, 271)
// wait for all events to be processed
wg sync.WaitGroup
// access it with atomic package
count int32
)
wg.Add(1)
handler := func(event *hz.EntryNotified) {
atomic.AddInt32(&count, 1)
// it is okay to use conversion, since greatest key is 1000
key := int(event.Key.(int64))
pid, err := calculatePartitionID(ss, key)
if err != nil {
panic(err)
}
partitionToEvent[pid] = append(partitionToEvent[pid], key)
if count == 1000 {
// last event processed
wg.Done()
}
}
it.MustValue(m.AddEntryListener(ctx, lc, handler))
for i := 1; i <= 1000; i++ {
it.MustValue(m.Put(ctx, i, "test"))
}
Comment on lines +244 to +246
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you refactor this so m.Puts run concurrently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead implemented a blackbox one as we discussed df8e738

wg.Wait()
for _, keys := range partitionToEvent {
if !sort.IntsAreSorted(keys) {
t.Fatalf("events are not processed in order, event keys:\n%v\n", keys)
}
}
}

func TestClientHeartbeat(t *testing.T) {
// Slow test.
t.SkipNow()
Expand Down Expand Up @@ -618,7 +730,7 @@ func TestClientStartShutdownMemoryLeak(t *testing.T) {
ctx := context.Background()
var max uint64
var m runtime.MemStats
const limit = 8 * 1024 * 1024 // 8 MB
const limit = 8 * 1024 * 1024 // 16 MB
runtime.GC()
runtime.ReadMemStats(&m)
base := m.Alloc
Expand All @@ -634,8 +746,6 @@ func TestClientStartShutdownMemoryLeak(t *testing.T) {
t.Logf("memory allocation: %d at iteration: %d", m.Alloc, i)
if m.Alloc > base && m.Alloc-base > limit {
max = m.Alloc - base
}
if max > limit {
t.Fatalf("memory allocation: %d > %d (base: %d) at iteration: %d", max, limit, base, i)
}
}
Expand Down
30 changes: 21 additions & 9 deletions internal/invocation/invocation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,24 @@ type Handler interface {
}

type Service struct {
handler Handler
requestCh chan Invocation
urgentRequestCh chan Invocation
responseCh chan *proto.ClientMessage
// removeCh carries correlationIDs to be removed
removeCh chan int64
doneCh chan struct{}
groupLostCh chan *GroupLostEvent
invocations map[int64]Invocation
handler Handler
urgentRequestCh chan Invocation
eventDispatcher *event.DispatchService
logger logger.LogAdaptor
state int32
// removeCh carries correlationIDs to be removed
removeCh chan int64
executor *stripeExecutor
logger logger.LogAdaptor
state int32
}

func NewService(
handler Handler,
eventDispacher *event.DispatchService,
eventDispatcher *event.DispatchService,
logger logger.LogAdaptor) *Service {
s := &Service{
requestCh: make(chan Invocation),
Expand All @@ -68,15 +69,17 @@ func NewService(
groupLostCh: make(chan *GroupLostEvent),
invocations: map[int64]Invocation{},
handler: handler,
eventDispatcher: eventDispacher,
eventDispatcher: eventDispatcher,
logger: logger,
state: ready,
executor: newStripeExecutor(),
}
s.eventDispatcher.Subscribe(EventGroupLost, serviceSubID, func(event event.Event) {
go func() {
s.groupLostCh <- event.(*GroupLostEvent)
}()
})
s.executor.start()
go s.processIncoming()
return s
}
Expand All @@ -85,6 +88,7 @@ func (s *Service) Stop() {
if !atomic.CompareAndSwapInt32(&s.state, ready, stopped) {
return
}
s.executor.stop()
close(s.doneCh)
}

Expand Down Expand Up @@ -183,7 +187,15 @@ func (s *Service) handleClientMessage(msg *proto.ClientMessage) {
return fmt.Sprintf("invocation with unknown correlation ID: %d", correlationID)
})
} else if inv.EventHandler() != nil {
go inv.EventHandler()(msg)
handler := func() {
utku-caglayan marked this conversation as resolved.
Show resolved Hide resolved
inv.EventHandler()(msg)
}
partitionID := msg.PartitionID()
// no specific partition (-1) are dispatched randomly in dispatch func.
ok := s.executor.dispatch(int(partitionID), handler)
if !ok {
s.logger.Warnf("event could not be processed, corresponding queue is full. PartitionID: %d, CorrelationID: %d", partitionID, correlationID)
}
}
return
}
Expand Down
93 changes: 93 additions & 0 deletions internal/invocation/stripe_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package invocation

import (
"math/rand"
"runtime"
"sync"
)

var (
// Default values differ from java impl. Also queue size is calculated differently.
// Java Client: queueSize per worker = defaultEventQueueCapacity / defaultEventWorkerCount
// Go Client: queueSize per worker = defaultEventQueueCapacity
defaultEventQueueCapacity = 10000
defaultEventWorkerCount = runtime.NumCPU()
)

// executor represents the function that will run on workers of stripeExecutor.
type executor func(queue chan func(), quit chan struct{}, wg *sync.WaitGroup)

// stripeExecutor executes given "tasks" preserving the order among the ones that are given with the same key.
type stripeExecutor struct {
quit chan struct{}
execFn executor
taskQueues []chan func()
queueCount int
wg sync.WaitGroup
}

// newStripeExecutor returns a new stripeExecutor with default configuration.
func newStripeExecutor() *stripeExecutor {
return newStripeExecutorWithConfig(defaultEventWorkerCount, defaultEventQueueCapacity)
}

// newStripeExecutor returns a new stripeExecutor with configured queueCount and queueSize. If parameters are not greater than zero, it panics.
func newStripeExecutorWithConfig(queueCount, queueSize int) *stripeExecutor {
if queueCount <= 0 {
panic("queueCount must be greater than 0")
}
if queueSize <= 0 {
panic("queueSize must be greater than 0")
}
se := stripeExecutor{
taskQueues: make([]chan func(), queueCount),
queueCount: queueCount,
}
for i := range se.taskQueues {
se.taskQueues[i] = make(chan func(), queueSize)
}
se.quit = make(chan struct{})
se.execFn = defaultExecFn
return &se
}

// start fires up the workers for each queue.
func (se *stripeExecutor) start() {
se.wg.Add(se.queueCount)
for i := range se.taskQueues {
go se.execFn(se.taskQueues[i], se.quit, &se.wg)
}
}

// dispatch sends the handler "task" to one of the appropriate taskQueues, "tasks" with the same key end up on the same queue. Returns false if queue is full and could not dispatch.
func (se *stripeExecutor) dispatch(key int, task func()) bool {
if key < 0 {
// dispatch random.
key = rand.Intn(se.queueCount)
}
select {
case se.taskQueues[key%se.queueCount] <- task:
default:
// do not block if queue is full.
return false
}
return true
}

// stop blocks until all workers are stopped.
func (se *stripeExecutor) stop() {
close(se.quit)
se.wg.Wait()
}

func defaultExecFn(queue chan func(), quit chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case task := <-queue:
task()
case <-quit:
return
yuce marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Loading