forked from fl00r/go-iproto
-
Notifications
You must be signed in to change notification settings - Fork 2
/
buffer.go
129 lines (115 loc) · 1.87 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package iproto
import (
"log"
"sync"
"sync/atomic"
)
var _ = log.Print
type bufBookmark struct {
Bookmark
state uint32
}
func (b *bufBookmark) Respond(r *Response) {
b.state = bsFree
}
const (
bsNew = iota
bsSet
bsFree
)
const (
bufRow = 1024
)
type bufferRow struct {
id uint64
row [bufRow]bufBookmark
}
type Buffer struct {
ch chan *Request
onExit func()
set chan bool
m sync.Mutex
rows map[uint64]*bufferRow
head, tail uint64
hRow, tRow *bufferRow
}
func (b *Buffer) init() {
b.rows = make(map[uint64]*bufferRow)
row := new(bufferRow)
b.hRow, b.tRow, b.rows[0] = row, row, row
b.set = make(chan bool, 1)
}
func (b *Buffer) push(r *Request) {
if b.tail == b.head {
select {
case b.ch <- r:
return
default:
}
}
tail := atomic.AddUint64(&b.tail, 1) - 1
big := tail / bufRow
row := b.tRow
if row.id != big {
var ok bool
b.m.Lock()
if row, ok = b.rows[big]; !ok {
row = &bufferRow{id: big}
b.rows[big] = row
b.tRow = row
}
b.m.Unlock()
}
middle := &row.row[tail%bufRow]
if r.ChainBookmark(middle) {
middle.state = bsSet
} else {
middle.state = bsFree
}
select {
case b.set <- true:
default:
}
}
func (b *Buffer) close() {
select {
case b.set <- true:
default:
}
close(b.set)
}
func (b *Buffer) loop() {
for <-b.set {
Tiny:
for ; b.head < atomic.LoadUint64(&b.tail); b.head++ {
var ok bool
big := b.head / bufRow
row := b.hRow
if row.id != big {
b.m.Lock()
row, ok = b.rows[big]
delete(b.rows, big-1)
b.m.Unlock()
if !ok {
break
}
b.hRow = row
}
middle := &row.row[b.head%bufRow]
switch middle.state {
case bsNew:
break Tiny
case bsSet:
req := middle.Request
middle.state = bsFree
if req != nil && req.IsPending() {
b.ch <- req
}
case bsFree:
}
}
}
if b.onExit != nil {
b.onExit()
}
}