-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathtesterWatcher.js
148 lines (131 loc) · 7.12 KB
/
testerWatcher.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (C) 2017-2022 BinaryMist Limited. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the file /licenses/bsl.md
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0
import redis from 'redis';
let log;
let redisOptions;
let longPollTimeout;
// pub/sub (SSE)
let subscribeClients = {};
let nonBlockingClients = {};
// For queueing (which we use for long polling) as opposed to pub/sub (which we use for SSE):
// If non blocking redis commands are used on a redis client that isn't using blocking commands, the non-blocking commands may be queued up until after the blocking ones finish.
// So we use client instances that will be used for blocking commands only.
let blockingClients = {};
const subscribe = async (redisChannel, callback) => {
subscribeClients[redisChannel] = redis.createClient(redisOptions);
subscribeClients[redisChannel].on('error', (error) => {
log.error(`Redis error: ${error}`, { tags: ['testerWatcher'] });
});
await subscribeClients[redisChannel].connect()
.then(() => {
log.info(`A connection is established for subscribeClients[redisChannel: ${redisChannel}] to redis at "${redisOptions.socket.host}:${redisOptions.socket.port}".`, { tags: ['testerWatcher'] });
})
.catch((e) => {
log.error(`An error occurred for subscribeClients[redisChannel: ${redisChannel}] while trying to connect. The error was: "${e.message}".`, { tags: ['testerWatcher'] });
});
log.info(`About to subscribe ${redisChannel} redis client to channel: ${redisChannel}`, { tags: ['testerWatcher'] });
await subscribeClients[redisChannel].subscribe(redisChannel, callback)
.catch((error) => {
log.error(`Redis error: ${error}`, { tags: ['testerWatcher'] });
});
};
const getTesterMessages = async (redisList) => {
// If list has > 0 items, we want to return as many as there are now.
// If list has 0 items, we want to wait until it has at least one item, then return it.
const lLen = nonBlockingClients[redisList].lLen.bind(nonBlockingClients[redisList]);
const lPop = nonBlockingClients[redisList].lPop.bind(nonBlockingClients[redisList]);
const blPop = blockingClients[redisList].blPop.bind(blockingClients[redisList]);
const curListLen = await lLen(redisList).catch((e) => {
log.error(`Error occurred while attempting to get list length of list "${redisList}". Error was: ${e}`, { tags: ['testerWatcher'] });
});
let testerMessageSet = [];
if (curListLen > 0) {
const testerMessageSetOfPromises = [];
const handleLpopError = (e) => { log.error(`Error occurred while attempting to lpop list "${redisList}". Error was: ${e}`, { tags: ['testerWatcher'] }); };
for (let i = 0; i < curListLen; i += 1) {
testerMessageSetOfPromises.push(lPop(redisList).catch(handleLpopError));
}
testerMessageSet = await Promise.all(testerMessageSetOfPromises).catch((e) => { log.error(`Error occurred while attempting to resolve testerMessageSetOfPromises lpop'ed from list "${redisList}". Error was: ${e}`, { tags: ['testerWatcher'] }); });
} else { // Wait...
// blpop's resolved promise could be one of two things (https://redis.io/commands/blpop#return-value):
// 1. If it times out: An array with one element being null.
// 2. If a value becomes available: An array with two elements. First being the key name of the list. Second being the value lpoped (popped from the head) from the list.
// After cleanUpAfterTestRun has been executed, the clients will no longer exist, so calls to blpop will resolve to just null.
// If the isolated: true doesn't help (which it probably wont) remove it.
const multiBulk = await blPop(redisList, longPollTimeout).catch((e) => { log.error(`Error occurred while attempting to blPop list "${redisList}". Error was: ${e}`, { tags: ['testerWatcher'] }); });
testerMessageSet.push((multiBulk && multiBulk[0] !== null) /* must be the name of the key where an element was popped (I.E. the name of our channel and list) */ ? multiBulk[1] : null);
}
return testerMessageSet;
};
const pollTesterMessages = async (redisChannel, callback) => {
const redisList = redisChannel;
// Only do the subscription once for each channel.
if (!subscribeClients[redisChannel]) {
nonBlockingClients[redisList] = redis.createClient(redisOptions);
await nonBlockingClients[redisList].connect()
.then(() => {
log.info(`A connection is established for nonBlockingClients[redisList: ${redisList}] to redis at "${redisOptions.socket.host}:${redisOptions.socket.port}".`, { tags: ['testerWatcher'] });
})
.catch((e) => {
log.error(`An error occurred for nonBlockingClients[redisList: ${redisList}] while trying to connect. The error was: "${e.message}".`, { tags: ['testerWatcher'] });
});
blockingClients[redisList] = redis.createClient(redisOptions);
await blockingClients[redisList].connect()
.then(() => {
log.info(`A connection is established for blockingClients[redisList: ${redisList}] to redis at "${redisOptions.socket.host}:${redisOptions.socket.port}".`, { tags: ['testerWatcher'] });
})
.catch((e) => {
log.error(`An error occurred for blockingClients[redisList: ${redisList}] while trying to connect. The error was: "${e.message}".`, { tags: ['testerWatcher'] });
});
await subscribe(redisChannel, async (message, channel) => {
// Push message to list with same name as channel.
await nonBlockingClients[channel].rPush(redisList, message);
});
}
// Here is where we block and wait for our list to contain messages,
// then build a reponse array for the given list, pass each message into callback,
// callback also contains a reference to cleanUpAfterTestRun,
// then return the CLI ready set of messages.
const testerMessageSet = await getTesterMessages(redisList);
const cliAfiedTesterMessageSet = await testerMessageSet.reduce(async (accum, tM) => {
const results = await accum;
return [...results, await callback(tM, redisChannel)];
}, []);
return cliAfiedTesterMessageSet;
};
const cleanUpAfterTestRun = () => {
// Will need to cleanup subscribeClients.
Object.values(subscribeClients).forEach((c) => { c.unsubscribe(); c.quit(); });
Object.values(nonBlockingClients).forEach((c) => { c.quit(); });
Object.values(blockingClients).forEach((c) => { c.quit(); });
subscribeClients = {};
nonBlockingClients = {};
blockingClients = {};
};
const serverStart = (options) => {
({ log, redis: redisOptions, longPollTimeout } = options);
const { testerFeedbackCommsMedium } = options;
return {
lp: { pollTesterMessages, cleanUpAfterTestRun, testerFeedbackCommsMedium },
sse: { subscribe, cleanUpAfterTestRun, testerFeedbackCommsMedium }
}[testerFeedbackCommsMedium];
};
export {
subscribe,
pollTesterMessages,
cleanUpAfterTestRun,
serverStart
};
// Test Job Channels:
// app-lowPrivUser -> list
// app-adminUser -> list
// server-NA -> list
// tls-NA -> list
// events
// 'testerProgress'
// 'testerPctComplete'
// 'testerBugCount'