-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
222 lines (194 loc) · 4.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
var pull = require('pull-stream/pull')
var looper = require('looper')
function destroy (stream) {
if(!stream.destroy)
console.error(
'warning, stream-to-pull-stream: \n'
+ 'the wrapped node-stream does not implement `destroy`, \n'
+ 'this may cause resource leaks.'
)
else stream.destroy()
}
function write(read, stream, cb) {
var ended, closed = false, did
function done () {
if(did) return
did = true
cb && cb(ended === true ? null : ended)
}
function onClose () {
if(closed) return
closed = true
cleanup()
if(!ended) read(ended = true, done)
else done()
}
function onError (err) {
cleanup()
if(!ended) read(ended = err, done)
}
function cleanup() {
stream.on('finish', onClose)
stream.removeListener('close', onClose)
stream.removeListener('error', onError)
}
stream.on('close', onClose)
stream.on('finish', onClose)
stream.on('error', onError)
process.nextTick(function () {
looper(function (next) {
read(null, function (end, data) {
ended = ended || end
//you can't "end" a stdout stream, so this needs to be handled specially.
if(end === true)
return stream._isStdio ? done() : stream.end()
if(ended = ended || end) {
destroy(stream)
return done(ended)
}
//I noticed a problem streaming to the terminal:
//sometimes the end got cut off, creating invalid output.
//it seems that stdout always emits "drain" when it ends.
//so this seems to work, but i have been unable to reproduce this test
//automatically, so you need to run ./test/stdout.js a few times and the end is valid json.
if(stream._isStdio)
stream.write(data, function () { next() })
else {
var pause = stream.write(data)
if(pause === false)
stream.once('drain', next)
else next()
}
})
})
})
}
function first (emitter, events, handler) {
function listener (val) {
events.forEach(function (e) {
emitter.removeListener(e, listener)
})
handler(val)
}
events.forEach(function (e) {
emitter.on(e, listener)
})
return emitter
}
function read2(stream) {
var ended = false, waiting = false
var _cb
function read () {
var data = stream.read()
if(data !== null && _cb) {
var cb = _cb; _cb = null
cb(null, data)
}
}
stream.on('readable', function () {
waiting = true
_cb && read()
})
.on('end', function () {
ended = true
_cb && _cb(ended)
})
.on('error', function (err) {
ended = err
_cb && _cb(ended)
})
return function (end, cb) {
_cb = cb
if(ended)
cb(ended)
else if(waiting)
read()
}
}
function read1(stream) {
var buffer = [], cbs = [], ended, paused = false
var draining
function drain() {
while((buffer.length || ended) && cbs.length)
cbs.shift()(buffer.length ? null : ended, buffer.shift())
if(!buffer.length && (paused)) {
paused = false
stream.resume()
}
}
stream.on('data', function (data) {
buffer.push(data)
drain()
if(buffer.length && stream.pause) {
paused = true
stream.pause()
}
})
stream.on('end', function () {
ended = true
drain()
})
stream.on('close', function () {
ended = true
drain()
})
stream.on('error', function (err) {
ended = err
drain()
})
return function (abort, cb) {
if(!cb) throw new Error('*must* provide cb')
if(abort) {
function onAbort () {
while(cbs.length) cbs.shift()(abort)
cb(abort)
}
//if the stream happens to have already ended, then we don't need to abort.
if(ended) return onAbort()
stream.once('close', onAbort)
destroy(stream)
}
else {
cbs.push(cb)
drain()
}
}
}
var read = read1
var sink = function (stream, cb) {
return function (read) {
return write(read, stream, cb)
}
}
var source = function (stream) {
return read1(stream)
}
exports = module.exports = function (stream, cb) {
return (
(stream.writable && stream.write)
? stream.readable
? function(_read) {
write(_read, stream, cb);
return read1(stream)
}
: sink(stream, cb)
: source(stream)
)
}
exports.sink = sink
exports.source = source
exports.read = read
exports.read1 = read1
exports.read2 = read2
exports.duplex = function (stream, cb) {
return {
source: source(stream),
sink: sink(stream, cb)
}
}
exports.transform = function (stream) {
return function (read) {
var _source = source(stream)
sink(stream)(read); return _source
}
}