-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
56 lines (44 loc) · 1.01 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
var mqstreams = require('mqstreams')
, Graft = require('graft')
, inherits = require('inherits')
, through2 = require('through2')
, mqemitter = require('mqemitter')
function through(func) {
return through2.obj({
highWaterMark: 16
}, func)
}
function subscribe(mq) {
return through(function(req, enc, done) {
var returnStatus;
if (!req.topic) {
returnStatus = {
status: 'not subscribed'
, reason: 'missing topic'
}
} else {
mq.readable(req.topic).pipe(req.messages)
returnStatus = {
status: 'subscribed'
, topic: req.topic
}
}
if (req.ret)
req.ret.end(returnStatus)
done()
})
}
function publish(mq) {
return through(function(req, enc, done) {
mq.emit(req, done)
})
}
module.exports = function(mq) {
var graft = new Graft()
if (!mq)
mq = mqemitter()
mq = mqstreams(mq)
graft.where({ cmd: 'subscribe' }, subscribe(mq))
graft.where({ cmd: 'publish' }, publish(mq))
return graft
}