-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstreams.go
85 lines (75 loc) · 1.65 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package gofcgisrv
import (
"bytes"
"io"
"sync"
)
// streamReader is really sort of a piper. Maybe
type streamReader struct {
buffer bytes.Buffer
lock sync.Mutex
gotData *sync.Cond
err error
}
func newStreamReader() *streamReader {
s := new(streamReader)
s.gotData = sync.NewCond(&s.lock)
return s
}
func (sr *streamReader) Read(data []byte) (int, error) {
sr.lock.Lock()
defer sr.lock.Unlock()
// Wait for something to show up
for sr.buffer.Len() == 0 && sr.err == nil {
sr.gotData.Wait()
}
if sr.buffer.Len() == 0 {
return 0, sr.err
}
return sr.buffer.Read(data)
}
func (sr *streamReader) Write(data []byte) (int, error) {
sr.lock.Lock()
defer sr.lock.Unlock()
if sr.err == nil {
n, err := sr.buffer.Write(data)
sr.gotData.Signal()
return n, err
}
return 0, sr.err
}
func (sr *streamReader) Close() error {
sr.lock.Lock()
defer sr.lock.Unlock()
sr.err = io.EOF
sr.gotData.Signal()
return nil
}
// streamWriter writes data as FCGI records.
type streamWriter struct {
w io.Writer
tp recordType
id requestId
lock sync.Mutex
}
func newStreamWriter(w io.Writer, tp recordType, id requestId) *streamWriter {
return &streamWriter{w: w, tp: tp, id: id}
}
func (sw *streamWriter) Write(data []byte) (int, error) {
sw.lock.Lock()
defer sw.lock.Unlock()
if len(data) == 0 {
return 0, nil
}
rec := record{sw.tp, sw.id, data}
err := writeRecord(sw.w, rec)
// How much did we actually write? Just say nothing if we got an error.
if err != nil {
return 0, err
}
return len(data), nil
}
func (sw *streamWriter) Close() error {
// Close means writing an empty string
return writeRecord(sw.w, record{sw.tp, sw.id, nil})
}