-
Notifications
You must be signed in to change notification settings - Fork 6
/
spsc.go
82 lines (75 loc) · 1.4 KB
/
spsc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package onering
import (
"sync/atomic"
)
type SPSC struct {
_ [8]int64
wc int64
ring
_ [4]byte
}
func (r *SPSC) Get(i interface{}) bool {
var rc = r.rc
if wp := atomic.LoadInt64(&r.wp); rc >= wp {
if rc > r.rp {
atomic.StoreInt64(&r.rp, rc)
}
for ; rc >= wp; wp = atomic.LoadInt64(&r.wc) {
if atomic.LoadInt32(&r.done) > 0 {
return false
}
r.wait()
}
}
inject(i, r.data[rc&r.mask])
rc++
r.rc = rc
if r.rc-r.rp > r.maxbatch {
atomic.StoreInt64(&r.rp, rc)
}
return true
}
func (r *SPSC) Consume(i interface{}) {
var (
fn = extractfn(i)
maxbatch = int(r.maxbatch)
it iter
)
for keep := true; keep; {
var rc, wp = r.rc, atomic.LoadInt64(&r.wp)
for rc >= wp {
if atomic.LoadInt32(&r.done) > 0 {
return
}
r.wait()
wp = atomic.LoadInt64(&r.wc)
}
for i := 0; rc < wp && keep; it.inc() {
if i++; i&maxbatch == 0 {
r.rc = rc
atomic.StoreInt64(&r.rp, rc)
}
fn(&it, r.data[rc&r.mask])
rc++
keep = !it.stop
}
r.rc = rc
atomic.StoreInt64(&r.rp, rc)
}
}
func (r *SPSC) Put(i interface{}) {
var wc = r.wc
if diff, rp := wc-r.mask, atomic.LoadInt64(&r.rp); diff >= rp {
if wc > r.wp {
atomic.StoreInt64(&r.wp, wc)
}
for ; diff >= rp; rp = atomic.LoadInt64(&r.rp) {
r.wait()
}
}
r.data[wc&r.mask] = extractptr(i)
wc = atomic.AddInt64(&r.wc, 1)
if wc-r.wp > r.maxbatch {
atomic.StoreInt64(&r.wp, wc)
}
}