Skip to content

Commit

Permalink
refactor return of "dispatch"
Browse files Browse the repository at this point in the history
  • Loading branch information
utku-caglayan committed Dec 30, 2021
1 parent 2b65d5a commit 3bb2d0f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
4 changes: 2 additions & 2 deletions internal/invocation/invocation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func (s *Service) handleClientMessage(msg *proto.ClientMessage) {
}
partitionID := inv.PartitionID()
// no specific partition (-1) are dispatched randomly in dispatch func.
qFull := s.executor.dispatch(int(partitionID), handler)
if qFull {
ok := s.executor.dispatch(int(partitionID), handler)
if !ok {
s.logger.Warnf("event could not be processed, corresponding queue is full. PartitionID: %d, CorrelationID: %d", partitionID, correlationID)
}
}
Expand Down
12 changes: 6 additions & 6 deletions internal/invocation/stripe_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ func (se stripeExecutor) start() {
}
}

// dispatch sends the handler "task" to one of the appropriate taskQueues, "tasks" with the same key end up on the same queue. Returns true if queue is full and could not dispatch
func (se stripeExecutor) dispatch(key int, task func()) (queueFull bool) {
// dispatch sends the handler "task" to one of the appropriate taskQueues, "tasks" with the same key end up on the same queue. Returns false if queue is full and could not dispatch.
func (se stripeExecutor) dispatch(key int, task func()) bool {
if key < 0 {
// dispatch random
// dispatch random.
key = rand.Intn(se.queueCount)
}
select {
case se.taskQueues[key%se.queueCount] <- task:
default:
// do not block if queue is full
return true
// do not block if queue is full.
return false
}
return false
return true
}

// stop blocks until all workers are stopped.
Expand Down
16 changes: 10 additions & 6 deletions internal/invocation/stripe_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func Test_serialExecutor_dispatch(t *testing.T) {
tmpHandler := func() {
panic(i)
}
go se.dispatch(tt.key, tmpHandler)
if ok := se.dispatch(tt.key, tmpHandler); !ok {
t.Fatal("could not dispatch handler")
}
select {
case <-se.taskQueues[tt.expectedIndex]:
case <-time.After(time.Second):
Expand All @@ -105,11 +107,11 @@ func Test_serialExecutor_dispatchQueueFull(t *testing.T) {
se, err := newStripeExecutorWithConfig(1, 1)
assert.Nil(t, err)
// executor not running, make the queue full
qFull := se.dispatch(1, func() {})
assert.False(t, qFull)
ok := se.dispatch(1, func() {})
assert.True(t, ok)
// expect unsuccessful dispatch
qFull = se.dispatch(1, func() {})
assert.True(t, qFull)
ok = se.dispatch(1, func() {})
assert.False(t, ok)
}

func Test_serialExecutor_start(t *testing.T) {
Expand Down Expand Up @@ -143,7 +145,9 @@ func Test_serialExecutor_start(t *testing.T) {
se.start()
go func() {
for _, task := range tasks {
se.dispatch(task.key, task.handler)
if ok := se.dispatch(task.key, task.handler); !ok {
panic("could not dispatch event handler")
}
}
}()
time.Sleep(time.Second * 1)
Expand Down

0 comments on commit 3bb2d0f

Please sign in to comment.