-
Notifications
You must be signed in to change notification settings - Fork 391
/
Copy pathsubscribe.go
75 lines (67 loc) · 2.14 KB
/
subscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package events
import (
"log"
"reflect"
"time"
)
// Returns a synchronous event emitter.
func Subscribe(evsw EventSwitch, listenerID string) <-chan Event {
ch := make(chan Event, 0) // synchronous
return SubscribeOn(evsw, listenerID, ch)
}
// Like Subscribe, but lets the caller construct a channel. If the capacity of
// the provided channel is 0, it will be called synchronously; otherwise, it
// will drop when the capacity is reached and a select doesn't immediately
// send.
func SubscribeOn(evsw EventSwitch, listenerID string, ch chan Event) <-chan Event {
return SubscribeFilteredOn(evsw, listenerID, nil, ch)
}
func SubscribeToEvent(evsw EventSwitch, listenerID string, protoevent Event) <-chan Event {
ch := make(chan Event, 0) // synchronous
return SubscribeToEventOn(evsw, listenerID, protoevent, ch)
}
func SubscribeToEventOn(evsw EventSwitch, listenerID string, protoevent Event, ch chan Event) <-chan Event {
rt := reflect.TypeOf(protoevent)
return SubscribeFilteredOn(evsw, listenerID, func(event Event) bool {
return reflect.TypeOf(event) == rt
}, ch)
}
type EventFilter func(Event) bool
func SubscribeFiltered(evsw EventSwitch, listenerID string, filter EventFilter) <-chan Event {
ch := make(chan Event, 0)
return SubscribeFilteredOn(evsw, listenerID, filter, ch)
}
func SubscribeFilteredOn(evsw EventSwitch, listenerID string, filter EventFilter, ch chan Event) <-chan Event {
evsw.AddListener(listenerID, func(event Event) {
if filter != nil && !filter(event) {
return // filter
}
// NOTE: This callback must not block for performance.
if cap(ch) == 0 {
timeout := 10 * time.Second
LOOP:
for {
select { // sync
case ch <- event:
break LOOP
case <-evsw.Quit():
close(ch)
break LOOP
case <-time.After(timeout):
// After a minute, print a message for debugging.
log.Printf("[WARN] EventSwitch subscriber %v blocked on %v for %v", listenerID, event, timeout)
// Exponentially back off warning messages.
timeout *= 2
}
}
} else {
select {
case ch <- event:
default: // async
evsw.RemoveListener(listenerID) // TODO log
close(ch)
}
}
})
return ch
}