-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(schema/appdata): async listener mux'ing (#20879)
Co-authored-by: cool-developer <[email protected]>
- Loading branch information
1 parent
897f4f8
commit 683371f
Showing
4 changed files
with
569 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,162 @@ | ||
package appdata | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
) | ||
|
||
// AsyncListenerOptions are options for async listeners and listener mux's. | ||
type AsyncListenerOptions struct { | ||
// Context is the context whose Done() channel listeners use will use to listen for completion to close their | ||
// goroutine. If it is nil, then context.Background() will be used and goroutines may be leaked. | ||
Context context.Context | ||
|
||
// BufferSize is the buffer size of the channels to use. It defaults to 0. | ||
BufferSize int | ||
|
||
// DoneWaitGroup is an optional wait-group that listener goroutines will notify via Add(1) when they are started | ||
// and Done() after they are cancelled and completed. | ||
DoneWaitGroup *sync.WaitGroup | ||
} | ||
|
||
// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously | ||
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil | ||
// except for Commit which will return an error or nil once all listeners have processed the commit. The context | ||
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the | ||
// channels used to send events to the listeners. | ||
func AsyncListenerMux(opts AsyncListenerOptions, listeners ...Listener) Listener { | ||
asyncListeners := make([]Listener, len(listeners)) | ||
commitChans := make([]chan error, len(listeners)) | ||
for i, l := range listeners { | ||
commitChan := make(chan error) | ||
commitChans[i] = commitChan | ||
asyncListeners[i] = AsyncListener(opts, commitChan, l) | ||
} | ||
mux := ListenerMux(asyncListeners...) | ||
muxCommit := mux.Commit | ||
mux.Commit = func(data CommitData) error { | ||
if muxCommit != nil { | ||
err := muxCommit(data) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
for _, commitChan := range commitChans { | ||
err := <-commitChan | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
return mux | ||
} | ||
|
||
// AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously | ||
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and | ||
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has | ||
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine | ||
// that is being used for listening will exit when context.Done() returns and no more events will be received by the listener. | ||
// bufferSize is the size of the buffer for the channel that is used to send events to the listener. | ||
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly | ||
// via its Commit callback. | ||
func AsyncListener(opts AsyncListenerOptions, commitChan chan<- error, listener Listener) Listener { | ||
packetChan := make(chan Packet, opts.BufferSize) | ||
res := Listener{} | ||
ctx := opts.Context | ||
if ctx == nil { | ||
ctx = context.Background() | ||
} | ||
done := ctx.Done() | ||
|
||
go func() { | ||
if opts.DoneWaitGroup != nil { | ||
opts.DoneWaitGroup.Add(1) | ||
} | ||
|
||
var err error | ||
for { | ||
select { | ||
case packet := <-packetChan: | ||
if err != nil { | ||
// if we have an error, don't process any more packets | ||
// and return the error and finish when it's time to commit | ||
if _, ok := packet.(CommitData); ok { | ||
commitChan <- err | ||
return | ||
} | ||
} else { | ||
// process the packet | ||
err = listener.SendPacket(packet) | ||
// if it's a commit | ||
if _, ok := packet.(CommitData); ok { | ||
commitChan <- err | ||
if err != nil { | ||
return | ||
} | ||
} | ||
} | ||
|
||
case <-done: | ||
close(packetChan) | ||
if opts.DoneWaitGroup != nil { | ||
opts.DoneWaitGroup.Done() | ||
} | ||
return | ||
} | ||
} | ||
}() | ||
|
||
if listener.InitializeModuleData != nil { | ||
res.InitializeModuleData = func(data ModuleInitializationData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.StartBlock != nil { | ||
res.StartBlock = func(data StartBlockData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.OnTx != nil { | ||
res.OnTx = func(data TxData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.OnEvent != nil { | ||
res.OnEvent = func(data EventData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.OnKVPair != nil { | ||
res.OnKVPair = func(data KVPairData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.OnObjectUpdate != nil { | ||
res.OnObjectUpdate = func(data ObjectUpdateData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
if listener.Commit != nil { | ||
res.Commit = func(data CommitData) error { | ||
packetChan <- data | ||
return nil | ||
} | ||
} | ||
|
||
return res | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
package appdata | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"testing" | ||
) | ||
|
||
func TestAsyncListenerMux(t *testing.T) { | ||
t.Run("empty", func(t *testing.T) { | ||
listener := AsyncListenerMux(AsyncListenerOptions{}, Listener{}, Listener{}) | ||
|
||
if listener.InitializeModuleData != nil { | ||
t.Error("expected nil") | ||
} | ||
if listener.StartBlock != nil { | ||
t.Error("expected nil") | ||
} | ||
if listener.OnTx != nil { | ||
t.Error("expected nil") | ||
} | ||
if listener.OnEvent != nil { | ||
t.Error("expected nil") | ||
} | ||
if listener.OnKVPair != nil { | ||
t.Error("expected nil") | ||
} | ||
if listener.OnObjectUpdate != nil { | ||
t.Error("expected nil") | ||
} | ||
|
||
// commit is not expected to be nil | ||
}) | ||
|
||
t.Run("call cancel", func(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
wg := &sync.WaitGroup{} | ||
var calls1, calls2 []string | ||
listener1 := callCollector(1, func(name string, _ int, _ Packet) { | ||
calls1 = append(calls1, name) | ||
}) | ||
listener2 := callCollector(2, func(name string, _ int, _ Packet) { | ||
calls2 = append(calls2, name) | ||
}) | ||
res := AsyncListenerMux(AsyncListenerOptions{ | ||
BufferSize: 16, Context: ctx, DoneWaitGroup: wg, | ||
}, listener1, listener2) | ||
|
||
callAllCallbacksOnces(t, res) | ||
|
||
expectedCalls := []string{ | ||
"InitializeModuleData", | ||
"StartBlock", | ||
"OnTx", | ||
"OnEvent", | ||
"OnKVPair", | ||
"OnObjectUpdate", | ||
"Commit", | ||
} | ||
|
||
checkExpectedCallOrder(t, calls1, expectedCalls) | ||
checkExpectedCallOrder(t, calls2, expectedCalls) | ||
|
||
// cancel and expect the test to finish - if all goroutines aren't canceled the test will hang | ||
cancel() | ||
wg.Wait() | ||
}) | ||
|
||
t.Run("error on commit", func(t *testing.T) { | ||
var calls1, calls2 []string | ||
listener1 := callCollector(1, func(name string, _ int, _ Packet) { | ||
calls1 = append(calls1, name) | ||
}) | ||
listener1.Commit = func(data CommitData) error { | ||
return fmt.Errorf("error") | ||
} | ||
listener2 := callCollector(2, func(name string, _ int, _ Packet) { | ||
calls2 = append(calls2, name) | ||
}) | ||
res := AsyncListenerMux(AsyncListenerOptions{}, listener1, listener2) | ||
|
||
err := res.Commit(CommitData{}) | ||
if err == nil || err.Error() != "error" { | ||
t.Fatalf("expected error, got %v", err) | ||
} | ||
}) | ||
} | ||
|
||
func TestAsyncListener(t *testing.T) { | ||
t.Run("call cancel", func(t *testing.T) { | ||
commitChan := make(chan error) | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
wg := &sync.WaitGroup{} | ||
var calls []string | ||
listener := callCollector(1, func(name string, _ int, _ Packet) { | ||
calls = append(calls, name) | ||
}) | ||
res := AsyncListener(AsyncListenerOptions{BufferSize: 16, Context: ctx, DoneWaitGroup: wg}, | ||
commitChan, listener) | ||
|
||
callAllCallbacksOnces(t, res) | ||
|
||
err := <-commitChan | ||
if err != nil { | ||
t.Fatalf("expected nil, got %v", err) | ||
} | ||
|
||
checkExpectedCallOrder(t, calls, []string{ | ||
"InitializeModuleData", | ||
"StartBlock", | ||
"OnTx", | ||
"OnEvent", | ||
"OnKVPair", | ||
"OnObjectUpdate", | ||
"Commit", | ||
}) | ||
|
||
calls = nil | ||
|
||
// expect wait group to return after cancel is called | ||
cancel() | ||
wg.Wait() | ||
}) | ||
|
||
t.Run("error", func(t *testing.T) { | ||
commitChan := make(chan error) | ||
var calls []string | ||
listener := callCollector(1, func(name string, _ int, _ Packet) { | ||
calls = append(calls, name) | ||
}) | ||
|
||
listener.OnKVPair = func(updates KVPairData) error { | ||
return fmt.Errorf("error") | ||
} | ||
|
||
res := AsyncListener(AsyncListenerOptions{BufferSize: 16}, commitChan, listener) | ||
|
||
callAllCallbacksOnces(t, res) | ||
|
||
err := <-commitChan | ||
if err == nil || err.Error() != "error" { | ||
t.Fatalf("expected error, got %v", err) | ||
} | ||
|
||
checkExpectedCallOrder(t, calls, []string{"InitializeModuleData", "StartBlock", "OnTx", "OnEvent"}) | ||
}) | ||
} |
Oops, something went wrong.