-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathspub_ssub.ts
108 lines (97 loc) · 3.17 KB
/
spub_ssub.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import MockServer, { getConnectionName } from "../../helpers/mock_server";
import { expect } from "chai";
import { Cluster } from "../../../lib";
import * as sinon from "sinon";
import Redis from "../../../lib/Redis";
import { noop } from "../../../lib/utils";
describe("cluster:spub/ssub", function () {
it("should receive messages", (done) => {
const handler = function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [
[0, 1, ["127.0.0.1", 30001]],
[2, 16383, ["127.0.0.1", 30002]],
];
}
};
const node1 = new MockServer(30001, handler);
new MockServer(30002, handler);
const options = [{ host: "127.0.0.1", port: "30001" }];
const ssub = new Cluster(options);
ssub.ssubscribe("test cluster", function () {
node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [
"smessage",
"test shard channel",
"hi",
]);
});
ssub.on("smessage", function (channel, message) {
expect(channel).to.eql("test shard channel");
expect(message).to.eql("hi");
ssub.disconnect();
done();
});
});
it("should works when sending regular commands", (done) => {
const handler = function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
}
};
new MockServer(30001, handler);
const ssub = new Cluster([{ port: "30001" }]);
ssub.ssubscribe("test cluster", function () {
ssub.set("foo", "bar").then((res) => {
expect(res).to.eql("OK");
ssub.disconnect();
done();
});
});
});
it("supports password", (done) => {
const handler = function (argv, c) {
if (argv[0] === "auth") {
c.password = argv[1];
return;
}
if (argv[0] === "ssubscribe") {
expect(c.password).to.eql("abc");
expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)");
}
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
}
};
new MockServer(30001, handler);
const ssub = new Redis.Cluster([{ port: "30001", password: "abc" }]);
ssub.ssubscribe("test cluster", function () {
ssub.disconnect();
done();
});
});
it("should re-ssubscribe after reconnection", (done) => {
new MockServer(30001, function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
} else if (argv[0] === "ssubscribe" || argv[0] === "psubscribe") {
return [argv[0], argv[1]];
}
});
const client = new Cluster([{ host: "127.0.0.1", port: "30001" }]);
client.ssubscribe("test cluster", function () {
const stub = sinon
.stub(Redis.prototype, "ssubscribe")
.callsFake((channels) => {
expect(channels).to.eql(["test cluster"]);
stub.restore();
client.disconnect();
done();
return Redis.prototype.ssubscribe.apply(this, arguments);
});
client.once("end", function () {
client.connect().catch(noop);
});
client.disconnect();
});
});
});