diff --git a/README.md b/README.md index b06b7f3cfd..4fca8d405d 100644 --- a/README.md +++ b/README.md @@ -166,47 +166,7 @@ To learn more about isolated execution, check out the [guide](./docs/isolated-ex ### Pub/Sub -Subscribing to a channel requires a dedicated stand-alone connection. You can easily get one by `.duplicate()`ing an existing Redis connection. - -```typescript -const subscriber = client.duplicate(); - -await subscriber.connect(); -``` - -Once you have one, simply subscribe and unsubscribe as needed: - -```typescript -await subscriber.subscribe('channel', (message) => { - console.log(message); // 'message' -}); - -await subscriber.pSubscribe('channe*', (message, channel) => { - console.log(message, channel); // 'message', 'channel' -}); - -await subscriber.unsubscribe('channel'); - -await subscriber.pUnsubscribe('channe*'); -``` - -Publish a message on a channel: - -```typescript -await publisher.publish('channel', 'message'); -``` - -There is support for buffers as well: - -```typescript -await subscriber.subscribe('channel', (message) => { - console.log(message); // -}, true); - -await subscriber.pSubscribe('channe*', (message, channel) => { - console.log(message, channel); // , -}, true); -``` +See the [Pub/Sub overview](./docs/pub-sub.md). ### Scan Iterator @@ -373,15 +333,18 @@ Check out the [Clustering Guide](./docs/clustering.md) when using Node Redis to The Node Redis client class is an Nodejs EventEmitter and it emits an event each time the network status changes: -| Event name | Scenes | Arguments to be passed to the listener | -|----------------|-------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------| -| `connect` | The client is initiating a connection to the server. | _No argument_ | -| `ready` | The client successfully initiated the connection to the server. | _No argument_ | -| `end` | The client disconnected the connection to the server via `.quit()` or `.disconnect()`. | _No argument_ | -| `error` | When a network error has occurred, such as unable to connect to the server or the connection closed unexpectedly. | 1 argument: The error object, such as `SocketClosedUnexpectedlyError: Socket closed unexpectedly` or `Error: connect ECONNREFUSED [IP]:[PORT]` | -| `reconnecting` | The client is trying to reconnect to the server. | _No argument_ | +| Name | When | Listener arguments | +|-------------------------|------------------------------------------------------------------------------------|------------------------------------------------------------| +| `connect` | Initiating a connection to the server | *No arguments* | +| `ready` | Client is ready to use | *No arguments* | +| `end` | Connection has been closed (via `.quit()` or `.disconnect()`) | *No arguments* | +| `error` | An error has occurred—usually a network issue such as "Socket closed unexpectedly" | `(error: Error)` | +| `reconnecting` | Client is trying to reconnect to the server | *No arguments* | +| `sharded-channel-moved` | See [here](./docs/pub-sub.md#sharded-channel-moved-event) | See [here](./docs/pub-sub.md#sharded-channel-moved-event) | + +> :warning: You **MUST** listen to `error` events. If a client doesn't have at least one `error` listener registered and an `error` occurs, that error will be thrown and the Node.js process will exit. See the [`EventEmitter` docs](https://nodejs.org/api/events.html#events_error_events) for more details. -The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above. +> The client will not emit [any other events](./docs/v3-to-v4.md#all-the-removed-events) beyond those listed above. ## Supported Redis versions diff --git a/docs/client-configuration.md b/docs/client-configuration.md index d57d0c5dd3..1854f07158 100644 --- a/docs/client-configuration.md +++ b/docs/client-configuration.md @@ -15,7 +15,7 @@ | socket.reconnectStrategy | `retries => Math.min(retries * 50, 500)` | A function containing the [Reconnect Strategy](#reconnect-strategy) logic | | username | | ACL username ([see ACL guide](https://redis.io/topics/acl)) | | password | | ACL password or the old "--requirepass" password | -| name | | Connection name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) | +| name | | Client name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) | | database | | Redis database number (see [`SELECT`](https://redis.io/commands/select) command) | | modules | | Included [Redis Modules](../README.md#packages) | | scripts | | Script definitions (see [Lua Scripts](../README.md#lua-scripts)) | @@ -25,30 +25,22 @@ | readonly | `false` | Connect in [`READONLY`](https://redis.io/commands/readonly) mode | | legacyMode | `false` | Maintain some backwards compatibility (see the [Migration Guide](./v3-to-v4.md)) | | isolationPoolOptions | | See the [Isolated Execution Guide](./isolated-execution.md) | -| pingInterval | | Send `PING` command at interval (in ms). Useful with "[Azure Cache for Redis](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout)" | +| pingInterval | | Send `PING` command at interval (in ms). Useful with ["Azure Cache for Redis"](https://learn.microsoft.com/en-us/azure/azure-cache-for-redis/cache-best-practices-connection#idle-timeout) | ## Reconnect Strategy -When a network error occurs the client will automatically try to reconnect, following a default linear strategy (the more attempts, the more waiting before trying to reconnect). +When the socket closes unexpectedly (without calling `.quit()`/`.disconnect()`), the client uses `reconnectStrategy` to decide what to do. The following values are supported: +1. `false` -> do not reconnect, close the client and flush the command queue. +2. `number` -> wait for `X` milliseconds before reconnecting. +3. `(retries: number, cause: Error) => false | number | Error` -> `number` is the same as configuring a `number` directly, `Error` is the same as `false`, but with a custom error. -This strategy can be overridden by providing a `socket.reconnectStrategy` option during the client's creation. +By default the strategy is `Math.min(retries * 50, 500)`, but it can be overwritten like so: -The `socket.reconnectStrategy` is a function that: - -- Receives the number of retries attempted so far. -- Returns `number | Error`: - - `number`: wait time in milliseconds prior to attempting a reconnect. - - `Error`: closes the client and flushes internal command queues. - -The example below shows the default `reconnectStrategy` and how to override it. - -```typescript -import { createClient } from 'redis'; - -const client = createClient({ - socket: { - reconnectStrategy: (retries) => Math.min(retries * 50, 500) - } +```javascript +createClient({ + socket: { + reconnectStrategy: retries => Math.min(retries * 50, 1000) + } }); ``` @@ -60,7 +52,7 @@ To enable TLS, set `socket.tls` to `true`. Below are some basic examples. ### Create a SSL client -```typescript +```javascript createClient({ socket: { tls: true, @@ -72,7 +64,7 @@ createClient({ ### Create a SSL client using a self-signed certificate -```typescript +```javascript createClient({ socket: { tls: true, diff --git a/docs/clustering.md b/docs/clustering.md index 48fc98640c..28ea0e2964 100644 --- a/docs/clustering.md +++ b/docs/clustering.md @@ -35,6 +35,7 @@ const value = await cluster.get('key'); | rootNodes | | An array of root nodes that are part of the cluster, which will be used to get the cluster topology. Each element in the array is a client configuration object. There is no need to specify every node in the cluster, 3 should be enough to reliably connect and obtain the cluster configuration from the server | | defaults | | The default configuration values for every client in the cluster. Use this for example when specifying an ACL user to connect with | | useReplicas | `false` | When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes | +| minimizeConnections | `false` | When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. Useful for short-term or Pub/Sub-only connections. | | maxCommandRedirections | `16` | The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors | | nodeAddressMap | | Defines the [node address mapping](#node-address-map) | | modules | | Included [Redis Modules](../README.md#packages) | @@ -59,27 +60,45 @@ createCluster({ ## Node Address Map -A node address map is required when a Redis cluster is configured with addresses that are inaccessible by the machine running the Redis client. -This is a mapping of addresses and ports, with the values being the accessible address/port combination. Example: +A mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to. +Useful when the cluster is running on a different network to the client. ```javascript +const rootNodes = [{ + url: 'external-host-1.io:30001' +}, { + url: 'external-host-2.io:30002' +}]; + +// Use either a static mapping: createCluster({ - rootNodes: [{ - url: 'external-host-1.io:30001' - }, { - url: 'external-host-2.io:30002' - }], + rootNodes, nodeAddressMap: { '10.0.0.1:30001': { - host: 'external-host-1.io', + host: 'external-host.io', port: 30001 }, '10.0.0.2:30002': { - host: 'external-host-2.io', + host: 'external-host.io', port: 30002 } } }); + +// or create the mapping dynamically, as a function: +createCluster({ + rootNodes, + nodeAddressMap(address) { + const indexOfDash = address.lastIndexOf('-'), + indexOfDot = address.indexOf('.', indexOfDash), + indexOfColons = address.indexOf(':', indexOfDot); + + return { + host: `external-host-${address.substring(indexOfDash + 1, indexOfDot)}.io`, + port: Number(address.substring(indexOfColons + 1)) + }; + } +}); ``` > This is a common problem when using ElastiCache. See [Accessing ElastiCache from outside AWS](https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/accessing-elasticache.html) for more information on that. diff --git a/docs/pub-sub.md b/docs/pub-sub.md new file mode 100644 index 0000000000..1428dca77f --- /dev/null +++ b/docs/pub-sub.md @@ -0,0 +1,86 @@ +# Pub/Sub + +The Pub/Sub API is implemented by `RedisClient` and `RedisCluster`. + +## Pub/Sub with `RedisClient` + +Pub/Sub requires a dedicated stand-alone client. You can easily get one by `.duplicate()`ing an existing `RedisClient`: + +```typescript +const subscriber = client.duplicate(); +subscribe.on('error', err => console.error(err)); +await subscriber.connect(); +``` + +When working with a `RedisCluster`, this is handled automatically for you. + +### `sharded-channel-moved` event + +`RedisClient` emits the `sharded-channel-moved` event when the ["cluster slot"](https://redis.io/docs/reference/cluster-spec/#key-distribution-model) of a subscribed [Sharded Pub/Sub](https://redis.io/docs/manual/pubsub/#sharded-pubsub) channel has been moved to another shard. + +The event listener signature is as follows: +```typescript +( + channel: string, + listeners: { + buffers: Set; + strings: Set; + } +)`. +``` + +## Subscribing + +```javascript +const listener = (message, channel) => console.log(message, channel); +await client.subscribe('channel', listener); +await client.pSubscribe('channe*', listener); +// Use sSubscribe for sharded Pub/Sub: +await client.sSubscribe('channel', listener); +``` + +## Publishing + +```javascript +await client.publish('channel', 'message'); +// Use sPublish for sharded Pub/Sub: +await client.sPublish('channel', 'message'); +``` + +## Unsubscribing + +The code below unsubscribes all listeners from all channels. + +```javascript +await client.unsubscribe(); +await client.pUnsubscribe(); +// Use sUnsubscribe for sharded Pub/Sub: +await client.sUnsubscribe(); +``` + +To unsubscribe from specific channels: + +```javascript +await client.unsubscribe('channel'); +await client.unsubscribe(['1', '2']); +``` + +To unsubscribe a specific listener: + +```javascript +await client.unsubscribe('channel', listener); +``` + +## Buffers + +Publishing and subscribing using `Buffer`s is also supported: + +```javascript +await subscriber.subscribe('channel', message => { + console.log(message); // +}, true); // true = subscribe in `Buffer` mode. + +await subscriber.publish(Buffer.from('channel'), Buffer.from('message')); +``` + +> NOTE: Buffers and strings are supported both for the channel name and the message. You can mix and match these as desired. diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index f951cd6f84..7fffed8658 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,18 +1,18 @@ import * as LinkedList from 'yallist'; import { AbortError, ErrorReply } from '../errors'; -import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from '../commands'; +import { RedisCommandArguments, RedisCommandRawReply } from '../commands'; import RESP2Decoder from './RESP2/decoder'; import encodeCommand from './RESP2/encoder'; +import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; export interface QueueCommandOptions { asap?: boolean; chainId?: symbol; signal?: AbortSignal; returnBuffers?: boolean; - ignorePubSubMode?: boolean; } -interface CommandWaitingToBeSent extends CommandWaitingForReply { +export interface CommandWaitingToBeSent extends CommandWaitingForReply { args: RedisCommandArguments; chainId?: symbol; abort?: { @@ -28,27 +28,9 @@ interface CommandWaitingForReply { returnBuffers?: boolean; } -export enum PubSubSubscribeCommands { - SUBSCRIBE = 'SUBSCRIBE', - PSUBSCRIBE = 'PSUBSCRIBE' -} - -export enum PubSubUnsubscribeCommands { - UNSUBSCRIBE = 'UNSUBSCRIBE', - PUNSUBSCRIBE = 'PUNSUBSCRIBE' -} - -export type PubSubListener< - RETURN_BUFFERS extends boolean = false, - T = RETURN_BUFFERS extends true ? Buffer : string -> = (message: T, channel: T) => unknown; +const PONG = Buffer.from('pong'); -interface PubSubListeners { - buffers: Set>; - strings: Set>; -} - -type PubSubListenersMap = Map; +export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void; export default class RedisCommandsQueue { static #flushQueue(queue: LinkedList, err: Error): void { @@ -57,67 +39,54 @@ export default class RedisCommandsQueue { } } - static #emitPubSubMessage(listenersMap: PubSubListenersMap, message: Buffer, channel: Buffer, pattern?: Buffer): void { - const keyString = (pattern ?? channel).toString(), - listeners = listenersMap.get(keyString); - - if (!listeners) return; - - for (const listener of listeners.buffers) { - listener(message, channel); - } - - if (!listeners.strings.size) return; - - const channelString = pattern ? channel.toString() : keyString, - messageString = channelString === '__redis__:invalidate' ? - // https://github.com/redis/redis/pull/7469 - // https://github.com/redis/redis/issues/7463 - (message === null ? null : (message as any as Array).map(x => x.toString())) as any : - message.toString(); - for (const listener of listeners.strings) { - listener(messageString, channelString); - } - } - readonly #maxLength: number | null | undefined; readonly #waitingToBeSent = new LinkedList(); readonly #waitingForReply = new LinkedList(); + readonly #onShardedChannelMoved: OnShardedChannelMoved; - readonly #pubSubState = { - isActive: false, - subscribing: 0, - subscribed: 0, - unsubscribing: 0, - listeners: { - channels: new Map(), - patterns: new Map() - } - }; + readonly #pubSub = new PubSub(); - static readonly #PUB_SUB_MESSAGES = { - message: Buffer.from('message'), - pMessage: Buffer.from('pmessage'), - subscribe: Buffer.from('subscribe'), - pSubscribe: Buffer.from('psubscribe'), - unsubscribe: Buffer.from('unsubscribe'), - pUnsubscribe: Buffer.from('punsubscribe') - }; + get isPubSubActive() { + return this.#pubSub.isActive; + } #chainInExecution: symbol | undefined; #decoder = new RESP2Decoder({ returnStringsAsBuffers: () => { return !!this.#waitingForReply.head?.value.returnBuffers || - this.#pubSubState.isActive; + this.#pubSub.isActive; }, onReply: reply => { - if (this.#handlePubSubReply(reply)) { - return; - } else if (!this.#waitingForReply.length) { - throw new Error('Got an unexpected reply from Redis'); + if (this.#pubSub.isActive && Array.isArray(reply)) { + if (this.#pubSub.handleMessageReply(reply as Array)) return; + + const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(reply as Array); + if (isShardedUnsubscribe && !this.#waitingForReply.length) { + const channel = (reply[1] as Buffer).toString(); + this.#onShardedChannelMoved( + channel, + this.#pubSub.removeShardedListeners(channel) + ); + return; + } else if (isShardedUnsubscribe || PubSub.isStatusReply(reply as Array)) { + const head = this.#waitingForReply.head!.value; + if ( + (Number.isNaN(head.channelsCounter!) && reply[2] === 0) || + --head.channelsCounter! === 0 + ) { + this.#waitingForReply.shift()!.resolve(); + } + return; + } + if (PONG.equals(reply[0] as Buffer)) { + const { resolve, returnBuffers } = this.#waitingForReply.shift()!, + buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer; + resolve(returnBuffers ? buffer : buffer.toString()); + return; + } } - + const { resolve, reject } = this.#waitingForReply.shift()!; if (reply instanceof ErrorReply) { reject(reply); @@ -127,14 +96,16 @@ export default class RedisCommandsQueue { } }); - constructor(maxLength: number | null | undefined) { + constructor( + maxLength: number | null | undefined, + onShardedChannelMoved: OnShardedChannelMoved + ) { this.#maxLength = maxLength; + this.#onShardedChannelMoved = onShardedChannelMoved; } addCommand(args: RedisCommandArguments, options?: QueueCommandOptions): Promise { - if (this.#pubSubState.isActive && !options?.ignorePubSubMode) { - return Promise.reject(new Error('Cannot send commands in PubSub mode')); - } else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { + if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) { return Promise.reject(new Error('The queue is full')); } else if (options?.signal?.aborted) { return Promise.reject(new AbortError()); @@ -173,158 +144,76 @@ export default class RedisCommandsQueue { } subscribe( - command: PubSubSubscribeCommands, - channels: RedisCommandArgument | Array, + type: PubSubType, + channels: string | Array, listener: PubSubListener, returnBuffers?: T - ): Promise { - const channelsToSubscribe: Array = [], - listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? - this.#pubSubState.listeners.channels : - this.#pubSubState.listeners.patterns; - for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const channelString = typeof channel === 'string' ? channel : channel.toString(); - let listeners = listenersMap.get(channelString); - if (!listeners) { - listeners = { - buffers: new Set(), - strings: new Set() - }; - listenersMap.set(channelString, listeners); - channelsToSubscribe.push(channel); - } - - // https://github.com/microsoft/TypeScript/issues/23132 - (returnBuffers ? listeners.buffers : listeners.strings).add(listener as any); - } - - if (!channelsToSubscribe.length) { - return Promise.resolve(); - } - - return this.#pushPubSubCommand(command, channelsToSubscribe); + ) { + return this.#pushPubSubCommand( + this.#pubSub.subscribe(type, channels, listener, returnBuffers) + ); } unsubscribe( - command: PubSubUnsubscribeCommands, + type: PubSubType, channels?: string | Array, listener?: PubSubListener, returnBuffers?: T - ): Promise { - const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ? - this.#pubSubState.listeners.channels : - this.#pubSubState.listeners.patterns; - - if (!channels) { - const size = listeners.size; - listeners.clear(); - return this.#pushPubSubCommand(command, size); - } - - const channelsToUnsubscribe = []; - for (const channel of (Array.isArray(channels) ? channels : [channels])) { - const sets = listeners.get(channel); - if (!sets) continue; - - let shouldUnsubscribe; - if (listener) { - // https://github.com/microsoft/TypeScript/issues/23132 - (returnBuffers ? sets.buffers : sets.strings).delete(listener as any); - shouldUnsubscribe = !sets.buffers.size && !sets.strings.size; - } else { - shouldUnsubscribe = true; - } + ) { + return this.#pushPubSubCommand( + this.#pubSub.unsubscribe(type, channels, listener, returnBuffers) + ); + } - if (shouldUnsubscribe) { - channelsToUnsubscribe.push(channel); - listeners.delete(channel); - } - } + resubscribe(): Promise | undefined { + const commands = this.#pubSub.resubscribe(); + if (!commands.length) return; - if (!channelsToUnsubscribe.length) { - return Promise.resolve(); - } + return Promise.all( + commands.map(command => this.#pushPubSubCommand(command)) + ); + } - return this.#pushPubSubCommand(command, channelsToUnsubscribe); + extendPubSubChannelListeners( + type: PubSubType, + channel: string, + listeners: ChannelListeners + ) { + return this.#pushPubSubCommand( + this.#pubSub.extendChannelListeners(type, channel, listeners) + ); } - #pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array): Promise { - return new Promise((resolve, reject) => { - const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE, - inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing', - commandArgs: Array = [command]; + extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { + return this.#pushPubSubCommand( + this.#pubSub.extendTypeListeners(type, listeners) + ); + } - let channelsCounter: number; - if (typeof channels === 'number') { // unsubscribe only - channelsCounter = channels; - } else { - commandArgs.push(...channels); - channelsCounter = channels.length; - } + getPubSubListeners(type: PubSubType) { + return this.#pubSub.getTypeListeners(type); + } - this.#pubSubState.isActive = true; - this.#pubSubState[inProgressKey] += channelsCounter; + #pushPubSubCommand(command: PubSubCommand) { + if (command === undefined) return; + return new Promise((resolve, reject) => { this.#waitingToBeSent.push({ - args: commandArgs, - channelsCounter, + args: command.args, + channelsCounter: command.channelsCounter, returnBuffers: true, resolve: () => { - this.#pubSubState[inProgressKey] -= channelsCounter; - this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1); - this.#updatePubSubActiveState(); + command.resolve(); resolve(); }, reject: err => { - this.#pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1); - this.#updatePubSubActiveState(); + command.reject?.(); reject(err); } }); }); } - #updatePubSubActiveState(): void { - if ( - !this.#pubSubState.subscribed && - !this.#pubSubState.subscribing && - !this.#pubSubState.subscribed - ) { - this.#pubSubState.isActive = false; - } - } - - resubscribe(): Promise | undefined { - this.#pubSubState.subscribed = 0; - this.#pubSubState.subscribing = 0; - this.#pubSubState.unsubscribing = 0; - - const promises = [], - { channels, patterns } = this.#pubSubState.listeners; - - if (channels.size) { - promises.push( - this.#pushPubSubCommand( - PubSubSubscribeCommands.SUBSCRIBE, - [...channels.keys()] - ) - ); - } - - if (patterns.size) { - promises.push( - this.#pushPubSubCommand( - PubSubSubscribeCommands.PSUBSCRIBE, - [...patterns.keys()] - ) - ); - } - - if (promises.length) { - return Promise.all(promises); - } - } - getCommandToSend(): RedisCommandArguments | undefined { const toSend = this.#waitingToBeSent.shift(); if (!toSend) return; @@ -351,39 +240,9 @@ export default class RedisCommandsQueue { this.#decoder.write(chunk); } - #handlePubSubReply(reply: any): boolean { - if (!this.#pubSubState.isActive || !Array.isArray(reply)) return false; - - if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { - RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.channels, - reply[2], - reply[1] - ); - } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { - RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.patterns, - reply[3], - reply[2], - reply[1] - ); - } else if ( - RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) - ) { - if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#waitingForReply.shift()!.resolve(); - } - } - - return true; - } - flushWaitingForReply(err: Error): void { this.#decoder.reset(); - this.#pubSubState.isActive = false; + this.#pubSub.reset(); RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); if (!this.#chainInExecution) return; @@ -396,6 +255,8 @@ export default class RedisCommandsQueue { } flushAll(err: Error): void { + this.#decoder.reset(); + this.#pubSub.reset(); RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); RedisCommandsQueue.#flushQueue(this.#waitingToBeSent, err); } diff --git a/packages/client/lib/client/commands.ts b/packages/client/lib/client/commands.ts index 8b5320a5d8..2605962432 100644 --- a/packages/client/lib/client/commands.ts +++ b/packages/client/lib/client/commands.ts @@ -98,6 +98,7 @@ import * as PING from '../commands/PING'; import * as PUBSUB_CHANNELS from '../commands/PUBSUB_CHANNELS'; import * as PUBSUB_NUMPAT from '../commands/PUBSUB_NUMPAT'; import * as PUBSUB_NUMSUB from '../commands/PUBSUB_NUMSUB'; +import * as PUBSUB_SHARDCHANNELS from '../commands/PUBSUB_SHARDCHANNELS'; import * as RANDOMKEY from '../commands/RANDOMKEY'; import * as READONLY from '../commands/READONLY'; import * as READWRITE from '../commands/READWRITE'; @@ -317,6 +318,8 @@ export default { pubSubNumPat: PUBSUB_NUMPAT, PUBSUB_NUMSUB, pubSubNumSub: PUBSUB_NUMSUB, + PUBSUB_SHARDCHANNELS, + pubSubShardChannels: PUBSUB_SHARDCHANNELS, RANDOMKEY, randomKey: RANDOMKEY, READONLY, diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index a6cd7fd4b8..aadf823e57 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -2,14 +2,20 @@ import { strict as assert } from 'assert'; import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisClient, { RedisClientType } from '.'; import { RedisClientMultiCommandType } from './multi-command'; -import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands'; -import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors'; +import { RedisCommandRawReply, RedisModules, RedisFunctions, RedisScripts } from '../commands'; +import { AbortError, ClientClosedError, ClientOfflineError, ConnectionTimeoutError, DisconnectsClientError, ErrorReply, SocketClosedUnexpectedlyError, WatchError } from '../errors'; import { defineScript } from '../lua-script'; import { spy } from 'sinon'; import { once } from 'events'; import { ClientKillFilters } from '../commands/CLIENT_KILL'; +import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT'; import { promisify } from 'util'; +// We need to use 'require', because it's not possible with Typescript to import +// function that are exported as 'module.exports = function`, without esModuleInterop +// set to true. +const calculateSlot = require('cluster-key-slot'); + export const SQUARE_SCRIPT = defineScript({ SCRIPT: 'return ARGV[1] * ARGV[1];', NUMBER_OF_KEYS: 0, @@ -817,7 +823,34 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); - testUtils.testWithClient('should be able to quit in PubSub mode', async client => { + testUtils.testWithClient('should be able to PING in PubSub mode', async client => { + await client.connect(); + + try { + await client.subscribe('channel', () => { + // noop + }); + + const [string, buffer, customString, customBuffer] = await Promise.all([ + client.ping(), + client.ping(client.commandOptions({ returnBuffers: true })), + client.ping('custom'), + client.ping(client.commandOptions({ returnBuffers: true }), 'custom') + ]); + + assert.equal(string, 'pong'); + assert.deepEqual(buffer, Buffer.from('pong')); + assert.equal(customString, 'custom'); + assert.deepEqual(customBuffer, Buffer.from('custom')); + } finally { + await client.disconnect(); + } + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + + testUtils.testWithClient('should be able to QUIT in PubSub mode', async client => { await client.subscribe('channel', () => { // noop }); @@ -826,6 +859,122 @@ describe('Client', () => { assert.equal(client.isOpen, false); }, GLOBAL.SERVERS.OPEN); + + testUtils.testWithClient('should reject GET in PubSub mode', async client => { + await client.connect(); + + try { + await client.subscribe('channel', () => { + // noop + }); + + await assert.rejects(client.get('key'), ErrorReply); + } finally { + await client.disconnect(); + } + }, { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true + }); + + describe('shareded PubSub', () => { + testUtils.isVersionGreaterThanHook([7]); + + testUtils.testWithClient('should be able to receive messages', async publisher => { + const subscriber = publisher.duplicate(); + + await subscriber.connect(); + + try { + const listener = spy(); + await subscriber.sSubscribe('channel', listener); + + await Promise.all([ + waitTillBeenCalled(listener), + publisher.sPublish('channel', 'message') + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + + await subscriber.sUnsubscribe(); + + // should be able to send commands + await assert.doesNotReject(subscriber.ping()); + } finally { + await subscriber.disconnect(); + } + }, { + ...GLOBAL.SERVERS.OPEN + }); + + testUtils.testWithClient('should emit sharded-channel-moved event', async publisher => { + await publisher.clusterAddSlotsRange({ start: 0, end: 16383 }); + + const subscriber = publisher.duplicate(); + + await subscriber.connect(); + + try { + await subscriber.sSubscribe('channel', () => {}); + + await Promise.all([ + publisher.clusterSetSlot( + calculateSlot('channel'), + ClusterSlotStates.NODE, + await publisher.clusterMyId() + ), + once(subscriber, 'sharded-channel-moved') + ]); + + assert.equal( + await subscriber.ping(), + 'PONG' + ); + } finally { + await subscriber.disconnect(); + } + }, { + serverArguments: ['--cluster-enabled', 'yes'] + }); + }); + + testUtils.testWithClient('should handle errors in SUBSCRIBE', async publisher => { + const subscriber = publisher.duplicate(); + + await subscriber.connect(); + + try { + const listener1 = spy(); + await subscriber.subscribe('1', listener1); + + await publisher.aclSetUser('default', 'resetchannels'); + + + const listener2 = spy(); + await assert.rejects(subscriber.subscribe('2', listener2)); + + await Promise.all([ + waitTillBeenCalled(listener1), + publisher.aclSetUser('default', 'allchannels'), + publisher.publish('1', 'message'), + ]); + assert.ok(listener1.calledOnceWithExactly('message', '1')); + + await subscriber.subscribe('2', listener2); + + await Promise.all([ + waitTillBeenCalled(listener2), + publisher.publish('2', 'message'), + ]); + assert.ok(listener2.calledOnceWithExactly('message', '2')); + } finally { + await subscriber.disconnect(); + } + }, { + // this test change ACL rules, running in isolated server + serverArguments: [], + minimumDockerVersion: [6 ,2] // ACL PubSub rules were added in Redis 6.2 + }); }); testUtils.testWithClient('ConnectionTimeoutError', async client => { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index f1e83edb41..ae5e2fe5e8 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1,7 +1,7 @@ import COMMANDS from './commands'; import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, ConvertArgumentType, RedisFunction, ExcludeMappedString, RedisCommands } from '../commands'; import RedisSocket, { RedisSocketOptions, RedisTlsSocketOptions } from './socket'; -import RedisCommandsQueue, { PubSubListener, PubSubSubscribeCommands, PubSubUnsubscribeCommands, QueueCommandOptions } from './commands-queue'; +import RedisCommandsQueue, { QueueCommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; import { EventEmitter } from 'events'; @@ -14,23 +14,57 @@ import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; import { ClientClosedError, ClientOfflineError, DisconnectsClientError } from '../errors'; import { URL } from 'url'; import { TcpSocketConnectOpts } from 'net'; +import { PubSubType, PubSubListener, PubSubTypeListeners, ChannelListeners } from './pub-sub'; export interface RedisClientOptions< M extends RedisModules = RedisModules, F extends RedisFunctions = RedisFunctions, S extends RedisScripts = RedisScripts > extends RedisExtensions { + /** + * `redis[s]://[[username][:password]@][host][:port][/db-number]` + * See [`redis`](https://www.iana.org/assignments/uri-schemes/prov/redis) and [`rediss`](https://www.iana.org/assignments/uri-schemes/prov/rediss) IANA registration for more details + */ url?: string; + /** + * Socket connection properties + */ socket?: RedisSocketOptions; + /** + * ACL username ([see ACL guide](https://redis.io/topics/acl)) + */ username?: string; + /** + * ACL password or the old "--requirepass" password + */ password?: string; + /** + * Client name ([see `CLIENT SETNAME`](https://redis.io/commands/client-setname)) + */ name?: string; + /** + * Redis database number (see [`SELECT`](https://redis.io/commands/select) command) + */ database?: number; + /** + * Maximum length of the client's internal command queue + */ commandsQueueMaxLength?: number; + /** + * When `true`, commands are rejected when the client is reconnecting. + * When `false`, commands are queued for execution after reconnection. + */ disableOfflineQueue?: boolean; + /** + * Connect in [`READONLY`](https://redis.io/commands/readonly) mode + */ readonly?: boolean; legacyMode?: boolean; isolationPoolOptions?: PoolOptions; + /** + * Send `PING` command at interval (in ms). + * Useful with Redis deployments that do not use TCP Keep-Alive. + */ pingInterval?: number; } @@ -171,6 +205,10 @@ export default class RedisClient< return this.#socket.isReady; } + get isPubSubActive() { + return this.#queue.isPubSubActive; + } + get v4(): Record { if (!this.#options?.legacyMode) { throw new Error('the client is not in "legacy mode"'); @@ -215,7 +253,10 @@ export default class RedisClient< } #initiateQueue(): RedisCommandsQueue { - return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength); + return new RedisCommandsQueue( + this.#options?.commandsQueueMaxLength, + (channel, listeners) => this.emit('sharded-channel-moved', channel, listeners) + ); } #initiateSocket(): RedisSocket { @@ -377,8 +418,8 @@ export default class RedisClient< }); } - async connect(): Promise { - await this.#socket.connect(); + connect(): Promise { + return this.#socket.connect(); } async commandsExecutor( @@ -500,18 +541,9 @@ export default class RedisClient< select = this.SELECT; - #subscribe( - command: PubSubSubscribeCommands, - channels: string | Array, - listener: PubSubListener, - bufferMode?: T - ): Promise { - const promise = this.#queue.subscribe( - command, - channels, - listener, - bufferMode - ); + #pubSubCommand(promise: Promise | undefined) { + if (promise === undefined) return Promise.resolve(); + this.#tick(); return promise; } @@ -521,77 +553,127 @@ export default class RedisClient< listener: PubSubListener, bufferMode?: T ): Promise { - return this.#subscribe( - PubSubSubscribeCommands.SUBSCRIBE, - channels, - listener, - bufferMode + return this.#pubSubCommand( + this.#queue.subscribe( + PubSubType.CHANNELS, + channels, + listener, + bufferMode + ) ); } subscribe = this.SUBSCRIBE; + + UNSUBSCRIBE( + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ): Promise { + return this.#pubSubCommand( + this.#queue.unsubscribe( + PubSubType.CHANNELS, + channels, + listener, + bufferMode + ) + ); + } + + unsubscribe = this.UNSUBSCRIBE; + PSUBSCRIBE( patterns: string | Array, listener: PubSubListener, bufferMode?: T ): Promise { - return this.#subscribe( - PubSubSubscribeCommands.PSUBSCRIBE, - patterns, - listener, - bufferMode + return this.#pubSubCommand( + this.#queue.subscribe( + PubSubType.PATTERNS, + patterns, + listener, + bufferMode + ) ); } pSubscribe = this.PSUBSCRIBE; - #unsubscribe( - command: PubSubUnsubscribeCommands, - channels?: string | Array, + PUNSUBSCRIBE( + patterns?: string | Array, listener?: PubSubListener, bufferMode?: T ): Promise { - const promise = this.#queue.unsubscribe(command, channels, listener, bufferMode); - this.#tick(); - return promise; + return this.#pubSubCommand( + this.#queue.unsubscribe( + PubSubType.PATTERNS, + patterns, + listener, + bufferMode + ) + ); } - UNSUBSCRIBE( - channels?: string | Array, - listener?: PubSubListener, + pUnsubscribe = this.PUNSUBSCRIBE; + + SSUBSCRIBE( + channels: string | Array, + listener: PubSubListener, bufferMode?: T ): Promise { - return this.#unsubscribe( - PubSubUnsubscribeCommands.UNSUBSCRIBE, - channels, - listener, - bufferMode + return this.#pubSubCommand( + this.#queue.subscribe( + PubSubType.SHARDED, + channels, + listener, + bufferMode + ) ); } - unsubscribe = this.UNSUBSCRIBE; + sSubscribe = this.SSUBSCRIBE; - PUNSUBSCRIBE( - patterns?: string | Array, + SUNSUBSCRIBE( + channels?: string | Array, listener?: PubSubListener, bufferMode?: T ): Promise { - return this.#unsubscribe( - PubSubUnsubscribeCommands.PUNSUBSCRIBE, - patterns, - listener, - bufferMode + return this.#pubSubCommand( + this.#queue.unsubscribe( + PubSubType.SHARDED, + channels, + listener, + bufferMode + ) ); } - pUnsubscribe = this.PUNSUBSCRIBE; + sUnsubscribe = this.SUNSUBSCRIBE; + + getPubSubListeners(type: PubSubType) { + return this.#queue.getPubSubListeners(type); + } + + extendPubSubChannelListeners( + type: PubSubType, + channel: string, + listeners: ChannelListeners + ) { + return this.#pubSubCommand( + this.#queue.extendPubSubChannelListeners(type, channel, listeners) + ); + } + + extendPubSubListeners(type: PubSubType, listeners: PubSubTypeListeners) { + return this.#pubSubCommand( + this.#queue.extendPubSubListeners(type, listeners) + ); + } QUIT(): Promise { return this.#socket.quit(async () => { - const quitPromise = this.#queue.addCommand(['QUIT'], { - ignorePubSubMode: true - }); + const quitPromise = this.#queue.addCommand(['QUIT']); this.#tick(); const [reply] = await Promise.all([ quitPromise, diff --git a/packages/client/lib/client/pub-sub.spec.ts b/packages/client/lib/client/pub-sub.spec.ts new file mode 100644 index 0000000000..8b9f16732c --- /dev/null +++ b/packages/client/lib/client/pub-sub.spec.ts @@ -0,0 +1,151 @@ +import { strict as assert } from 'assert'; +import { PubSub, PubSubType } from './pub-sub'; + +describe('PubSub', () => { + const TYPE = PubSubType.CHANNELS, + CHANNEL = 'channel', + LISTENER = () => {}; + + describe('subscribe to new channel', () => { + function createAndSubscribe() { + const pubSub = new PubSub(), + command = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + + assert.equal(pubSub.isActive, true); + assert.ok(command); + assert.equal(command.channelsCounter, 1); + + return { + pubSub, + command + }; + } + + it('resolve', () => { + const { pubSub, command } = createAndSubscribe(); + + command.resolve(); + + assert.equal(pubSub.isActive, true); + }); + + it('reject', () => { + const { pubSub, command } = createAndSubscribe(); + + assert.ok(command.reject); + command.reject(); + + assert.equal(pubSub.isActive, false); + }); + }); + + it('subscribe to already subscribed channel', () => { + const pubSub = new PubSub(), + firstSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(firstSubscribe); + + const secondSubscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(secondSubscribe); + + firstSubscribe.resolve(); + + assert.equal( + pubSub.subscribe(TYPE, CHANNEL, LISTENER), + undefined + ); + }); + + it('unsubscribe all', () => { + const pubSub = new PubSub(); + + const subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(subscribe); + subscribe.resolve(); + assert.equal(pubSub.isActive, true); + + const unsubscribe = pubSub.unsubscribe(TYPE); + assert.equal(pubSub.isActive, true); + assert.ok(unsubscribe); + unsubscribe.resolve(); + assert.equal(pubSub.isActive, false); + }); + + describe('unsubscribe from channel', () => { + it('when not subscribed', () => { + const pubSub = new PubSub(), + unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL); + assert.ok(unsubscribe); + unsubscribe.resolve(); + assert.equal(pubSub.isActive, false); + }); + + it('when already subscribed', () => { + const pubSub = new PubSub(), + subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(subscribe); + subscribe.resolve(); + assert.equal(pubSub.isActive, true); + + const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL); + assert.equal(pubSub.isActive, true); + assert.ok(unsubscribe); + unsubscribe.resolve(); + assert.equal(pubSub.isActive, false); + }); + }); + + describe('unsubscribe from listener', () => { + it('when it\'s the only listener', () => { + const pubSub = new PubSub(), + subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(subscribe); + subscribe.resolve(); + assert.equal(pubSub.isActive, true); + + const unsubscribe = pubSub.unsubscribe(TYPE, CHANNEL, LISTENER); + assert.ok(unsubscribe); + unsubscribe.resolve(); + assert.equal(pubSub.isActive, false); + }); + + it('when there are more listeners', () => { + const pubSub = new PubSub(), + subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(subscribe); + subscribe.resolve(); + assert.equal(pubSub.isActive, true); + + assert.equal( + pubSub.subscribe(TYPE, CHANNEL, () => {}), + undefined + ); + + assert.equal( + pubSub.unsubscribe(TYPE, CHANNEL, LISTENER), + undefined + ); + }); + + describe('non-existing listener', () => { + it('on subscribed channel', () => { + const pubSub = new PubSub(), + subscribe = pubSub.subscribe(TYPE, CHANNEL, LISTENER); + assert.ok(subscribe); + subscribe.resolve(); + assert.equal(pubSub.isActive, true); + + assert.equal( + pubSub.unsubscribe(TYPE, CHANNEL, () => {}), + undefined + ); + assert.equal(pubSub.isActive, true); + }); + + it('on unsubscribed channel', () => { + const pubSub = new PubSub(); + assert.ok(pubSub.unsubscribe(TYPE, CHANNEL, () => {})); + assert.equal(pubSub.isActive, false); + }); + }); + }); +}); diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts new file mode 100644 index 0000000000..8efc9d8a2e --- /dev/null +++ b/packages/client/lib/client/pub-sub.ts @@ -0,0 +1,408 @@ +import { RedisCommandArgument } from "../commands"; + +export enum PubSubType { + CHANNELS = 'CHANNELS', + PATTERNS = 'PATTERNS', + SHARDED = 'SHARDED' +} + +const COMMANDS = { + [PubSubType.CHANNELS]: { + subscribe: Buffer.from('subscribe'), + unsubscribe: Buffer.from('unsubscribe'), + message: Buffer.from('message') + }, + [PubSubType.PATTERNS]: { + subscribe: Buffer.from('psubscribe'), + unsubscribe: Buffer.from('punsubscribe'), + message: Buffer.from('pmessage') + }, + [PubSubType.SHARDED]: { + subscribe: Buffer.from('ssubscribe'), + unsubscribe: Buffer.from('sunsubscribe'), + message: Buffer.from('smessage') + } +}; + +export type PubSubListener< + RETURN_BUFFERS extends boolean = false +> = (message: T, channel: T) => unknown; + +export interface ChannelListeners { + unsubscribing: boolean; + buffers: Set>; + strings: Set>; +} + +export type PubSubTypeListeners = Map; + +type Listeners = Record; + +export type PubSubCommand = ReturnType< + typeof PubSub.prototype.subscribe | + typeof PubSub.prototype.unsubscribe | + typeof PubSub.prototype.extendTypeListeners +>; + +export class PubSub { + static isStatusReply(reply: Array): boolean { + return ( + COMMANDS[PubSubType.CHANNELS].subscribe.equals(reply[0]) || + COMMANDS[PubSubType.CHANNELS].unsubscribe.equals(reply[0]) || + COMMANDS[PubSubType.PATTERNS].subscribe.equals(reply[0]) || + COMMANDS[PubSubType.PATTERNS].unsubscribe.equals(reply[0]) || + COMMANDS[PubSubType.SHARDED].subscribe.equals(reply[0]) + ); + } + + static isShardedUnsubscribe(reply: Array): boolean { + return COMMANDS[PubSubType.SHARDED].unsubscribe.equals(reply[0]); + } + + static #channelsArray(channels: string | Array) { + return (Array.isArray(channels) ? channels : [channels]); + } + + static #listenersSet( + listeners: ChannelListeners, + returnBuffers?: T + ) { + return (returnBuffers ? listeners.buffers : listeners.strings); + } + + #subscribing = 0; + + #isActive = false; + + get isActive() { + return this.#isActive; + } + + #listeners: Listeners = { + [PubSubType.CHANNELS]: new Map(), + [PubSubType.PATTERNS]: new Map(), + [PubSubType.SHARDED]: new Map() + }; + + subscribe( + type: PubSubType, + channels: string | Array, + listener: PubSubListener, + returnBuffers?: T + ) { + const args: Array = [COMMANDS[type].subscribe], + channelsArray = PubSub.#channelsArray(channels); + for (const channel of channelsArray) { + let channelListeners = this.#listeners[type].get(channel); + if (!channelListeners || channelListeners.unsubscribing) { + args.push(channel); + } + } + + if (args.length === 1) { + // all channels are already subscribed, add listeners without issuing a command + for (const channel of channelsArray) { + PubSub.#listenersSet( + this.#listeners[type].get(channel)!, + returnBuffers + ).add(listener); + } + return; + } + + this.#isActive = true; + this.#subscribing++; + return { + args, + channelsCounter: args.length - 1, + resolve: () => { + this.#subscribing--; + for (const channel of channelsArray) { + let listeners = this.#listeners[type].get(channel); + if (!listeners) { + listeners = { + unsubscribing: false, + buffers: new Set(), + strings: new Set() + }; + this.#listeners[type].set(channel, listeners); + } + + PubSub.#listenersSet(listeners, returnBuffers).add(listener); + } + }, + reject: () => { + this.#subscribing--; + this.#updateIsActive(); + } + }; + } + + extendChannelListeners( + type: PubSubType, + channel: string, + listeners: ChannelListeners + ) { + if (!this.#extendChannelListeners(type, channel, listeners)) return; + + this.#isActive = true; + this.#subscribing++; + return { + args: [ + COMMANDS[type].subscribe, + channel + ], + channelsCounter: 1, + resolve: () => this.#subscribing--, + reject: () => { + this.#subscribing--; + this.#updateIsActive(); + } + }; + } + + #extendChannelListeners( + type: PubSubType, + channel: string, + listeners: ChannelListeners + ) { + const existingListeners = this.#listeners[type].get(channel); + if (!existingListeners) { + this.#listeners[type].set(channel, listeners); + return true; + } + + for (const listener of listeners.buffers) { + existingListeners.buffers.add(listener); + } + + for (const listener of listeners.strings) { + existingListeners.strings.add(listener); + } + + return false; + } + + extendTypeListeners(type: PubSubType, listeners: PubSubTypeListeners) { + const args: Array = [COMMANDS[type].subscribe]; + for (const [channel, channelListeners] of listeners) { + if (this.#extendChannelListeners(type, channel, channelListeners)) { + args.push(channel); + } + } + + if (args.length === 1) return; + + this.#isActive = true; + this.#subscribing++; + return { + args, + channelsCounter: args.length - 1, + resolve: () => this.#subscribing--, + reject: () => { + this.#subscribing--; + this.#updateIsActive(); + } + }; + } + + unsubscribe( + type: PubSubType, + channels?: string | Array, + listener?: PubSubListener, + returnBuffers?: T + ) { + const listeners = this.#listeners[type]; + if (!channels) { + return this.#unsubscribeCommand( + [COMMANDS[type].unsubscribe], + // cannot use `this.#subscribed` because there might be some `SUBSCRIBE` commands in the queue + // cannot use `this.#subscribed + this.#subscribing` because some `SUBSCRIBE` commands might fail + NaN, + () => listeners.clear() + ); + } + + const channelsArray = PubSub.#channelsArray(channels); + if (!listener) { + return this.#unsubscribeCommand( + [COMMANDS[type].unsubscribe, ...channelsArray], + channelsArray.length, + () => { + for (const channel of channelsArray) { + listeners.delete(channel); + } + } + ); + } + + const args: Array = [COMMANDS[type].unsubscribe]; + for (const channel of channelsArray) { + const sets = listeners.get(channel); + if (sets) { + let current, + other; + if (returnBuffers) { + current = sets.buffers; + other = sets.strings; + } else { + current = sets.strings; + other = sets.buffers; + } + + const currentSize = current.has(listener) ? current.size - 1 : current.size; + if (currentSize !== 0 || other.size !== 0) continue; + sets.unsubscribing = true; + } + + args.push(channel); + } + + if (args.length === 1) { + // all channels has other listeners, + // delete the listeners without issuing a command + for (const channel of channelsArray) { + PubSub.#listenersSet( + listeners.get(channel)!, + returnBuffers + ).delete(listener); + } + return; + } + + return this.#unsubscribeCommand( + args, + args.length - 1, + () => { + for (const channel of channelsArray) { + const sets = listeners.get(channel); + if (!sets) continue; + + (returnBuffers ? sets.buffers : sets.strings).delete(listener); + if (sets.buffers.size === 0 && sets.strings.size === 0) { + listeners.delete(channel); + } + } + } + ); + } + + #unsubscribeCommand( + args: Array, + channelsCounter: number, + removeListeners: () => void + ) { + return { + args, + channelsCounter, + resolve: () => { + removeListeners(); + this.#updateIsActive(); + }, + reject: undefined // use the same structure as `subscribe` + }; + } + + #updateIsActive() { + this.#isActive = ( + this.#listeners[PubSubType.CHANNELS].size !== 0 || + this.#listeners[PubSubType.PATTERNS].size !== 0 || + this.#listeners[PubSubType.CHANNELS].size !== 0 || + this.#subscribing !== 0 + ); + } + + reset() { + this.#isActive = false; + this.#subscribing = 0; + } + + resubscribe(): Array { + const commands = []; + for (const [type, listeners] of Object.entries(this.#listeners)) { + if (!listeners.size) continue; + + this.#isActive = true; + this.#subscribing++; + const callback = () => this.#subscribing--; + commands.push({ + args: [ + COMMANDS[type as PubSubType].subscribe, + ...listeners.keys() + ], + channelsCounter: listeners.size, + resolve: callback, + reject: callback + }); + } + + return commands; + } + + handleMessageReply(reply: Array): boolean { + if (COMMANDS[PubSubType.CHANNELS].message.equals(reply[0])) { + this.#emitPubSubMessage( + PubSubType.CHANNELS, + reply[2], + reply[1] + ); + return true; + } else if (COMMANDS[PubSubType.PATTERNS].message.equals(reply[0])) { + this.#emitPubSubMessage( + PubSubType.PATTERNS, + reply[3], + reply[2], + reply[1] + ); + return true; + } else if (COMMANDS[PubSubType.SHARDED].message.equals(reply[0])) { + this.#emitPubSubMessage( + PubSubType.SHARDED, + reply[2], + reply[1] + ); + return true; + } + + return false; + } + + removeShardedListeners(channel: string): ChannelListeners { + const listeners = this.#listeners[PubSubType.SHARDED].get(channel)!; + this.#listeners[PubSubType.SHARDED].delete(channel); + this.#updateIsActive(); + return listeners; + } + + #emitPubSubMessage( + type: PubSubType, + message: Buffer, + channel: Buffer, + pattern?: Buffer + ): void { + const keyString = (pattern ?? channel).toString(), + listeners = this.#listeners[type].get(keyString); + + if (!listeners) return; + + for (const listener of listeners.buffers) { + listener(message, channel); + } + + if (!listeners.strings.size) return; + + const channelString = pattern ? channel.toString() : keyString, + messageString = channelString === '__redis__:invalidate' ? + // https://github.com/redis/redis/pull/7469 + // https://github.com/redis/redis/issues/7463 + (message === null ? null : (message as any as Array).map(x => x.toString())) as any : + message.toString(); + for (const listener of listeners.strings) { + listener(messageString, channelString); + } + } + + getTypeListeners(type: PubSubType): PubSubTypeListeners { + return this.#listeners[type]; + } +} diff --git a/packages/client/lib/client/socket.spec.ts b/packages/client/lib/client/socket.spec.ts index c5862130cf..86929b227a 100644 --- a/packages/client/lib/client/socket.spec.ts +++ b/packages/client/lib/client/socket.spec.ts @@ -1,5 +1,6 @@ -import { strict as assert } from 'assert'; +import { strict as assert } from 'node:assert'; import { spy } from 'sinon'; +import { once } from 'node:events'; import RedisSocket, { RedisSocketOptions } from './socket'; describe('Socket', () => { @@ -17,16 +18,42 @@ describe('Socket', () => { } describe('reconnectStrategy', () => { + it('false', async () => { + const socket = createSocket({ + host: 'error', + connectTimeout: 1, + reconnectStrategy: false + }); + + await assert.rejects(socket.connect()); + + assert.equal(socket.isOpen, false); + }); + + it('0', async () => { + const socket = createSocket({ + host: 'error', + connectTimeout: 1, + reconnectStrategy: 0 + }); + + socket.connect(); + await once(socket, 'error'); + assert.equal(socket.isOpen, true); + assert.equal(socket.isReady, false); + socket.disconnect(); + assert.equal(socket.isOpen, false); + }); + it('custom strategy', async () => { - const numberOfRetries = 10; + const numberOfRetries = 3; const reconnectStrategy = spy((retries: number) => { assert.equal(retries + 1, reconnectStrategy.callCount); if (retries === numberOfRetries) return new Error(`${numberOfRetries}`); - const time = retries * 2; - return time; + return 0; }); const socket = createSocket({ diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 345ac1d3e3..4c64f89955 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -6,10 +6,26 @@ import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyErro import { promiseTimeout } from '../utils'; export interface RedisSocketCommonOptions { + /** + * Connection Timeout (in milliseconds) + */ connectTimeout?: number; + /** + * Toggle [`Nagle's algorithm`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay) + */ noDelay?: boolean; + /** + * Toggle [`keep-alive`](https://nodejs.org/api/net.html#net_socket_setkeepalive_enable_initialdelay) + */ keepAlive?: number | false; - reconnectStrategy?(retries: number): number | Error; + /** + * When the socket closes unexpectedly (without calling `.quit()`/`.disconnect()`), the client uses `reconnectStrategy` to decide what to do. The following values are supported: + * 1. `false` -> do not reconnect, close the client and flush the command queue. + * 2. `number` -> wait for `X` milliseconds before reconnecting. + * 3. `(retries: number, cause: Error) => false | number | Error` -> `number` is the same as configuring a `number` directly, `Error` is the same as `false`, but with a custom error. + * Defaults to `retries => Math.min(retries * 50, 500)` + */ + reconnectStrategy?: false | number | ((retries: number, cause: Error) => false | Error | number); } type RedisNetSocketOptions = Partial & { @@ -83,23 +99,42 @@ export default class RedisSocket extends EventEmitter { this.#options = RedisSocket.#initiateOptions(options); } - reconnectStrategy(retries: number): number | Error { - if (this.#options.reconnectStrategy) { + #reconnectStrategy(retries: number, cause: Error) { + if (this.#options.reconnectStrategy === false) { + return false; + } else if (typeof this.#options.reconnectStrategy === 'number') { + return this.#options.reconnectStrategy; + } else if (this.#options.reconnectStrategy) { try { - const retryIn = this.#options.reconnectStrategy(retries); - if (typeof retryIn !== 'number' && !(retryIn instanceof Error)) { - throw new TypeError('Reconnect strategy should return `number | Error`'); + const retryIn = this.#options.reconnectStrategy(retries, cause); + if (retryIn !== false && !(retryIn instanceof Error) && typeof retryIn !== 'number') { + throw new TypeError(`Reconnect strategy should return \`false | Error | number\`, got ${retryIn} instead`); } return retryIn; } catch (err) { - this.emit('error', err); + this.emit('error', err); } } return Math.min(retries * 50, 500); } + #shouldReconnect(retries: number, cause: Error) { + const retryIn = this.#reconnectStrategy(retries, cause); + if (retryIn === false) { + this.#isOpen = false; + this.emit('error', cause); + return cause; + } else if (retryIn instanceof Error) { + this.#isOpen = false; + this.emit('error', cause); + return new ReconnectStrategyError(retryIn, cause); + } + + return retryIn; + } + async connect(): Promise { if (this.#isOpen) { throw new Error('Socket already opened'); @@ -109,13 +144,9 @@ export default class RedisSocket extends EventEmitter { return this.#connect(); } - async #connect(hadError?: boolean): Promise { + async #connect(): Promise { let retries = 0; do { - if (retries > 0 || hadError) { - this.emit('reconnecting'); - } - try { this.#socket = await this.#createSocket(); this.#writableNeedDrain = false; @@ -131,17 +162,17 @@ export default class RedisSocket extends EventEmitter { this.#isReady = true; this.emit('ready'); } catch (err) { - const retryIn = this.reconnectStrategy(retries); - if (retryIn instanceof Error) { - this.#isOpen = false; - this.emit('error', err); - throw new ReconnectStrategyError(retryIn, err); + const retryIn = this.#shouldReconnect(retries, err as Error); + if (typeof retryIn !== 'number') { + throw retryIn; } this.emit('error', err); await promiseTimeout(retryIn); } + retries++; + this.emit('reconnecting'); } while (this.#isOpen && !this.#isReady); } @@ -203,9 +234,10 @@ export default class RedisSocket extends EventEmitter { this.#isReady = false; this.emit('error', err); - if (!this.#isOpen) return; - - this.#connect(true).catch(() => { + if (!this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; + + this.emit('reconnecting'); + this.#connect().catch(() => { // the error was already emitted, silently ignore it }); } diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index d23ef569f3..2804f499f5 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -1,157 +1,254 @@ import RedisClient, { InstantiableRedisClient, RedisClientType } from '../client'; -import { RedisClusterMasterNode, RedisClusterReplicaNode } from '../commands/CLUSTER_NODES'; import { RedisClusterClientOptions, RedisClusterOptions } from '.'; import { RedisCommandArgument, RedisFunctions, RedisModules, RedisScripts } from '../commands'; import { RootNodesUnavailableError } from '../errors'; +import { ClusterSlotsNode } from '../commands/CLUSTER_SLOTS'; +import { types } from 'node:util'; +import { ChannelListeners, PubSubType, PubSubTypeListeners } from '../client/pub-sub'; +import { EventEmitter } from 'node:stream'; // We need to use 'require', because it's not possible with Typescript to import // function that are exported as 'module.exports = function`, without esModuleInterop // set to true. const calculateSlot = require('cluster-key-slot'); -export interface ClusterNode< +interface NodeAddress { + host: string; + port: number; +} + +export type NodeAddressMap = { + [address: string]: NodeAddress; +} | ((address: string) => NodeAddress | undefined); + +type ValueOrPromise = T | Promise; + +type ClientOrPromise< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> = ValueOrPromise>; + +export interface Node< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { - id: string; - client: RedisClientType; + address: string; + client?: ClientOrPromise; } -interface NodeAddress { +export interface ShardNode< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> extends Node { + id: string; host: string; port: number; + readonly: boolean; } -export type NodeAddressMap = { - [address: string]: NodeAddress; -} | ((address: string) => NodeAddress | undefined); +export interface MasterNode< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> extends ShardNode { + pubSubClient?: ClientOrPromise; +} -interface SlotNodes< +export interface Shard< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { - master: ClusterNode; - replicas: Array>; - clientIterator: IterableIterator> | undefined; + master: MasterNode; + replicas?: Array>; + nodesIterator?: IterableIterator>; } -type OnError = (err: unknown) => void; +type ShardWithReplicas< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> = Shard & Required, 'replicas'>>; + +export type PubSubNode< + M extends RedisModules, + F extends RedisFunctions, + S extends RedisScripts +> = Required>; + +type PubSubToResubscribe = Record< + PubSubType.CHANNELS | PubSubType.PATTERNS, + PubSubTypeListeners +>; + +export type OnShardedChannelMovedError = ( + err: unknown, + channel: string, + listeners?: ChannelListeners +) => void; export default class RedisClusterSlots< M extends RedisModules, F extends RedisFunctions, S extends RedisScripts > { + static #SLOTS = 16384; + readonly #options: RedisClusterOptions; readonly #Client: InstantiableRedisClient; - readonly #onError: OnError; - readonly #nodeByAddress = new Map>(); - readonly #slots: Array> = []; + readonly #emit: EventEmitter['emit']; + slots = new Array>(RedisClusterSlots.#SLOTS); + shards = new Array>(); + masters = new Array>(); + replicas = new Array>(); + readonly nodeByAddress = new Map | ShardNode>(); + pubSubNode?: PubSubNode; + + #isOpen = false; + + get isOpen() { + return this.#isOpen; + } - constructor(options: RedisClusterOptions, onError: OnError) { + constructor( + options: RedisClusterOptions, + emit: EventEmitter['emit'] + ) { this.#options = options; this.#Client = RedisClient.extend(options); - this.#onError = onError; + this.#emit = emit; } - async connect(): Promise { - for (const rootNode of this.#options.rootNodes) { - if (await this.#discoverNodes(rootNode)) return; + async connect() { + if (this.#isOpen) { + throw new Error('Cluster already open'); } - throw new RootNodesUnavailableError(); - } - - async #discoverNodes(clientOptions?: RedisClusterClientOptions): Promise { - const client = this.#initiateClient(clientOptions); - - await client.connect(); - + this.#isOpen = true; try { - await this.#reset(await client.clusterNodes()); - return true; + await this.#discoverWithRootNodes(); } catch (err) { - this.#onError(err); - return false; - } finally { - if (client.isOpen) { - await client.disconnect(); - } + this.#isOpen = false; + throw err; } } - #runningRediscoverPromise?: Promise; + async #discoverWithRootNodes() { + let start = Math.floor(Math.random() * this.#options.rootNodes.length); + for (let i = start; i < this.#options.rootNodes.length; i++) { + if (await this.#discover(this.#options.rootNodes[i])) return; + } - async rediscover(startWith: RedisClientType): Promise { - if (!this.#runningRediscoverPromise) { - this.#runningRediscoverPromise = this.#rediscover(startWith) - .finally(() => this.#runningRediscoverPromise = undefined); + for (let i = 0; i < start; i++) { + if (await this.#discover(this.#options.rootNodes[i])) return; } - return this.#runningRediscoverPromise; + throw new RootNodesUnavailableError(); } - async #rediscover(startWith: RedisClientType): Promise { - if (await this.#discoverNodes(startWith.options)) return; + #resetSlots() { + this.slots = new Array(RedisClusterSlots.#SLOTS); + this.shards = []; + this.masters = []; + this.replicas = []; + this.#randomNodeIterator = undefined; + } - for (const { client } of this.#nodeByAddress.values()) { - if (client === startWith) continue; + async #discover(rootNode?: RedisClusterClientOptions) { + this.#resetSlots(); + const addressesInUse = new Set(); - if (await this.#discoverNodes(client.options)) return; - } - - throw new Error('None of the cluster nodes is available'); - } + try { + const shards = await this.#getShards(rootNode), + promises: Array> = [], + eagerConnect = this.#options.minimizeConnections !== true; + for (const { from, to, master, replicas } of shards) { + const shard: Shard = { + master: this.#initiateSlotNode(master, false, eagerConnect, addressesInUse, promises) + }; + + if (this.#options.useReplicas) { + shard.replicas = replicas.map(replica => + this.#initiateSlotNode(replica, true, eagerConnect, addressesInUse, promises) + ); + } - async #reset(masters: Array): Promise { - // Override this.#slots and add not existing clients to this.#nodeByAddress - const promises: Array> = [], - clientsInUse = new Set(); - for (const master of masters) { - const slot = { - master: this.#initiateClientForNode(master, false, clientsInUse, promises), - replicas: this.#options.useReplicas ? - master.replicas.map(replica => this.#initiateClientForNode(replica, true, clientsInUse, promises)) : - [], - clientIterator: undefined // will be initiated in use - }; + this.shards.push(shard); - for (const { from, to } of master.slots) { for (let i = from; i <= to; i++) { - this.#slots[i] = slot; + this.slots[i] = shard; } } - } - // Remove unused clients from this.#nodeByAddress using clientsInUse - for (const [address, { client }] of this.#nodeByAddress.entries()) { - if (clientsInUse.has(address)) continue; + if (this.pubSubNode && !addressesInUse.has(this.pubSubNode.address)) { + if (types.isPromise(this.pubSubNode.client)) { + promises.push( + this.pubSubNode.client.then(client => client.disconnect()) + ); + this.pubSubNode = undefined; + } else { + promises.push(this.pubSubNode.client.disconnect()); + + const channelsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.CHANNELS), + patternsListeners = this.pubSubNode.client.getPubSubListeners(PubSubType.PATTERNS); + + if (channelsListeners.size || patternsListeners.size) { + promises.push( + this.#initiatePubSubClient({ + [PubSubType.CHANNELS]: channelsListeners, + [PubSubType.PATTERNS]: patternsListeners + }) + ); + } + } + } - promises.push(client.disconnect()); - this.#nodeByAddress.delete(address); - } + for (const [address, node] of this.nodeByAddress.entries()) { + if (addressesInUse.has(address)) continue; - await Promise.all(promises); - } + if (node.client) { + promises.push( + this.#execOnNodeClient(node.client, client => client.disconnect()) + ); + } + + const { pubSubClient } = node as MasterNode; + if (pubSubClient) { + promises.push( + this.#execOnNodeClient(pubSubClient, client => client.disconnect()) + ); + } - #clientOptionsDefaults(options?: RedisClusterClientOptions): RedisClusterClientOptions | undefined { - if (!this.#options.defaults) return options; + this.nodeByAddress.delete(address); + } - return { - ...this.#options.defaults, - ...options, - socket: this.#options.defaults.socket && options?.socket ? { - ...this.#options.defaults.socket, - ...options.socket - } : this.#options.defaults.socket ?? options?.socket - }; + await Promise.all(promises); + + return true; + } catch (err) { + this.#emit('error', err); + return false; + } } - #initiateClient(options?: RedisClusterClientOptions): RedisClientType { - return new this.#Client(this.#clientOptionsDefaults(options)) - .on('error', this.#onError); + async #getShards(rootNode?: RedisClusterClientOptions) { + const client = new this.#Client( + this.#clientOptionsDefaults(rootNode, true) + ); + + client.on('error', err => this.#emit('error', err)); + + await client.connect(); + + try { + // using `CLUSTER SLOTS` and not `CLUSTER SHARDS` to support older versions + return await client.clusterSlots(); + } finally { + await client.disconnect(); + } } #getNodeAddress(address: string): NodeAddress | undefined { @@ -164,130 +261,361 @@ export default class RedisClusterSlots< } } - #initiateClientForNode( - nodeData: RedisClusterMasterNode | RedisClusterReplicaNode, - readonly: boolean, - clientsInUse: Set, - promises: Array> - ): ClusterNode { - const address = `${nodeData.host}:${nodeData.port}`; - clientsInUse.add(address); + #clientOptionsDefaults( + options?: RedisClusterClientOptions, + disableReconnect?: boolean + ): RedisClusterClientOptions | undefined { + let result: RedisClusterClientOptions | undefined; + if (this.#options.defaults) { + let socket; + if (this.#options.defaults.socket) { + socket = options?.socket ? { + ...this.#options.defaults.socket, + ...options.socket + } : this.#options.defaults.socket; + } else { + socket = options?.socket; + } - let node = this.#nodeByAddress.get(address); + result = { + ...this.#options.defaults, + ...options, + socket + }; + } else { + result = options; + } + + if (disableReconnect) { + result ??= {}; + result.socket ??= {}; + result.socket.reconnectStrategy = false; + } + + return result; + } + + #initiateSlotNode( + { id, ip, port }: ClusterSlotsNode, + readonly: boolean, + eagerConnent: boolean, + addressesInUse: Set, + promises: Array> + ) { + const address = `${ip}:${port}`; + addressesInUse.add(address); + + let node = this.nodeByAddress.get(address); if (!node) { node = { - id: nodeData.id, - client: this.#initiateClient({ - socket: this.#getNodeAddress(address) ?? { - host: nodeData.host, - port: nodeData.port - }, - readonly - }) + id, + host: ip, + port, + address, + readonly, + client: undefined }; - promises.push(node.client.connect()); - this.#nodeByAddress.set(address, node); + + if (eagerConnent) { + promises.push(this.#createNodeClient(node)); + } + + this.nodeByAddress.set(address, node); } + (readonly ? this.replicas : this.masters).push(node); + return node; } - getSlotMaster(slot: number): ClusterNode { - return this.#slots[slot].master; + async #createClient( + node: ShardNode, + readonly = node.readonly + ) { + const client = new this.#Client( + this.#clientOptionsDefaults({ + socket: this.#getNodeAddress(node.address) ?? { + host: node.host, + port: node.port + }, + readonly + }) + ); + client.on('error', err => this.#emit('error', err)); + + await client.connect(); + + return client; } - *#slotClientIterator(slotNumber: number): IterableIterator> { - const slot = this.#slots[slotNumber]; - yield slot.master.client; + #createNodeClient(node: ShardNode) { + const promise = this.#createClient(node) + .then(client => { + node.client = client; + return client; + }) + .catch(err => { + node.client = undefined; + throw err; + }); + node.client = promise; + return promise; + } - for (const replica of slot.replicas) { - yield replica.client; - } + nodeClient(node: ShardNode) { + return node.client ?? this.#createNodeClient(node); } - #getSlotClient(slotNumber: number): RedisClientType { - const slot = this.#slots[slotNumber]; - if (!slot.clientIterator) { - slot.clientIterator = this.#slotClientIterator(slotNumber); - } + #runningRediscoverPromise?: Promise; - const {done, value} = slot.clientIterator.next(); - if (done) { - slot.clientIterator = undefined; - return this.#getSlotClient(slotNumber); - } + async rediscover(startWith: RedisClientType): Promise { + this.#runningRediscoverPromise ??= this.#rediscover(startWith) + .finally(() => this.#runningRediscoverPromise = undefined); + return this.#runningRediscoverPromise; + } + + async #rediscover(startWith: RedisClientType): Promise { + if (await this.#discover(startWith.options)) return; - return value; + return this.#discoverWithRootNodes(); } - #randomClientIterator?: IterableIterator>; + quit(): Promise { + return this.#destroy(client => client.quit()); + } - #getRandomClient(): RedisClientType { - if (!this.#nodeByAddress.size) { - throw new Error('Cluster is not connected'); - } + disconnect(): Promise { + return this.#destroy(client => client.disconnect()); + } + + async #destroy(fn: (client: RedisClientType) => Promise): Promise { + this.#isOpen = false; + + const promises = []; + for (const { master, replicas } of this.shards) { + if (master.client) { + promises.push( + this.#execOnNodeClient(master.client, fn) + ); + } - if (!this.#randomClientIterator) { - this.#randomClientIterator = this.#nodeByAddress.values(); + if (master.pubSubClient) { + promises.push( + this.#execOnNodeClient(master.pubSubClient, fn) + ); + } + + if (replicas) { + for (const { client } of replicas) { + if (client) { + promises.push( + this.#execOnNodeClient(client, fn) + ); + } + } + } } - const {done, value} = this.#randomClientIterator.next(); - if (done) { - this.#randomClientIterator = undefined; - return this.#getRandomClient(); + if (this.pubSubNode) { + promises.push(this.#execOnNodeClient(this.pubSubNode.client, fn)); + this.pubSubNode = undefined; } - return value.client; + this.#resetSlots(); + this.nodeByAddress.clear(); + + await Promise.allSettled(promises); + } + + #execOnNodeClient( + client: ClientOrPromise, + fn: (client: RedisClientType) => Promise + ) { + return types.isPromise(client) ? + client.then(fn) : + fn(client); } - getClient(firstKey?: RedisCommandArgument, isReadonly?: boolean): RedisClientType { + getClient( + firstKey: RedisCommandArgument | undefined, + isReadonly: boolean | undefined + ): ClientOrPromise { if (!firstKey) { - return this.#getRandomClient(); + return this.nodeClient(this.getRandomNode()); } - const slot = calculateSlot(firstKey); - if (!isReadonly || !this.#options.useReplicas) { - return this.getSlotMaster(slot).client; + const slotNumber = calculateSlot(firstKey); + if (!isReadonly) { + return this.nodeClient(this.slots[slotNumber].master); } - return this.#getSlotClient(slot); + return this.nodeClient(this.getSlotRandomNode(slotNumber)); } - getMasters(): Array> { - const masters = []; - for (const node of this.#nodeByAddress.values()) { - if (node.client.options?.readonly) continue; + *#iterateAllNodes() { + let i = Math.floor(Math.random() * (this.masters.length + this.replicas.length)); + if (i < this.masters.length) { + do { + yield this.masters[i]; + } while (++i < this.masters.length); - masters.push(node); + for (const replica of this.replicas) { + yield replica; + } + } else { + i -= this.masters.length; + do { + yield this.replicas[i]; + } while (++i < this.replicas.length); } - return masters; + while (true) { + for (const master of this.masters) { + yield master; + } + + for (const replica of this.replicas) { + yield replica; + } + } } - getNodeByAddress(address: string): ClusterNode | undefined { - const mappedAddress = this.#getNodeAddress(address); - return this.#nodeByAddress.get( - mappedAddress ? `${mappedAddress.host}:${mappedAddress.port}` : address - ); + #randomNodeIterator?: IterableIterator>; + + getRandomNode() { + this.#randomNodeIterator ??= this.#iterateAllNodes(); + return this.#randomNodeIterator.next().value as ShardNode; } - quit(): Promise { - return this.#destroy(client => client.quit()); + *#slotNodesIterator(slot: ShardWithReplicas) { + let i = Math.floor(Math.random() * (1 + slot.replicas.length)); + if (i < slot.replicas.length) { + do { + yield slot.replicas[i]; + } while (++i < slot.replicas.length); + } + + while (true) { + yield slot.master; + + for (const replica of slot.replicas) { + yield replica; + } + } } - disconnect(): Promise { - return this.#destroy(client => client.disconnect()); + getSlotRandomNode(slotNumber: number) { + const slot = this.slots[slotNumber]; + if (!slot.replicas?.length) { + return slot.master; + } + + slot.nodesIterator ??= this.#slotNodesIterator(slot as ShardWithReplicas); + return slot.nodesIterator.next().value as ShardNode; } - async #destroy(fn: (client: RedisClientType) => Promise): Promise { - const promises = []; - for (const { client } of this.#nodeByAddress.values()) { - promises.push(fn(client)); + getMasterByAddress(address: string) { + const master = this.nodeByAddress.get(address); + if (!master) return; + + return this.nodeClient(master); + } + + getPubSubClient() { + return this.pubSubNode ? + this.pubSubNode.client : + this.#initiatePubSubClient(); + } + + async #initiatePubSubClient(toResubscribe?: PubSubToResubscribe) { + const index = Math.floor(Math.random() * (this.masters.length + this.replicas.length)), + node = index < this.masters.length ? + this.masters[index] : + this.replicas[index - this.masters.length]; + + this.pubSubNode = { + address: node.address, + client: this.#createClient(node, true) + .then(async client => { + if (toResubscribe) { + await Promise.all([ + client.extendPubSubListeners(PubSubType.CHANNELS, toResubscribe[PubSubType.CHANNELS]), + client.extendPubSubListeners(PubSubType.PATTERNS, toResubscribe[PubSubType.PATTERNS]) + ]); + } + + this.pubSubNode!.client = client; + return client; + }) + .catch(err => { + this.pubSubNode = undefined; + throw err; + }) + }; + + return this.pubSubNode.client as Promise>; + } + + async executeUnsubscribeCommand( + unsubscribe: (client: RedisClientType) => Promise + ): Promise { + const client = await this.getPubSubClient(); + await unsubscribe(client); + + if (!client.isPubSubActive) { + await client.disconnect(); + this.pubSubNode = undefined; } + } + + getShardedPubSubClient(channel: string) { + const { master } = this.slots[calculateSlot(channel)]; + return master.pubSubClient ?? this.#initiateShardedPubSubClient(master); + } + + #initiateShardedPubSubClient(master: MasterNode) { + const promise = this.#createClient(master, true) + .then(client => { + client.on('server-sunsubscribe', async (channel, listeners) => { + try { + await this.rediscover(client); + const redirectTo = await this.getShardedPubSubClient(channel); + redirectTo.extendPubSubChannelListeners( + PubSubType.SHARDED, + channel, + listeners + ); + } catch (err) { + this.#emit('sharded-shannel-moved-error', err, channel, listeners); + } + }); + + master.pubSubClient = client; + return client; + }) + .catch(err => { + master.pubSubClient = undefined; + throw err; + }); + + master.pubSubClient = promise; + + return promise; + } - await Promise.all(promises); + async executeShardedUnsubscribeCommand( + channel: string, + unsubscribe: (client: RedisClientType) => Promise + ): Promise { + const { master } = this.slots[calculateSlot(channel)]; + if (!master.pubSubClient) return Promise.resolve(); - this.#nodeByAddress.clear(); - this.#slots.splice(0); + const client = await master.pubSubClient; + await unsubscribe(client); + + if (!client.isPubSubActive) { + await client.disconnect(); + master.pubSubClient = undefined; + } } } diff --git a/packages/client/lib/cluster/commands.ts b/packages/client/lib/cluster/commands.ts index 8edbd1e389..58fa651be1 100644 --- a/packages/client/lib/cluster/commands.ts +++ b/packages/client/lib/cluster/commands.ts @@ -135,6 +135,7 @@ import * as SORT_RO from '../commands/SORT_RO'; import * as SORT_STORE from '../commands/SORT_STORE'; import * as SORT from '../commands/SORT'; import * as SPOP from '../commands/SPOP'; +import * as SPUBLISH from '../commands/SPUBLISH'; import * as SRANDMEMBER_COUNT from '../commands/SRANDMEMBER_COUNT'; import * as SRANDMEMBER from '../commands/SRANDMEMBER'; import * as SREM from '../commands/SREM'; @@ -483,6 +484,8 @@ export default { sort: SORT, SPOP, sPop: SPOP, + SPUBLISH, + sPublish: SPUBLISH, SRANDMEMBER_COUNT, sRandMemberCount: SRANDMEMBER_COUNT, SRANDMEMBER, diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index a2981d824e..6b9d35f75f 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -1,25 +1,29 @@ import { strict as assert } from 'assert'; -import testUtils, { GLOBAL } from '../test-utils'; +import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils'; import RedisCluster from '.'; import { ClusterSlotStates } from '../commands/CLUSTER_SETSLOT'; import { SQUARE_SCRIPT } from '../client/index.spec'; import { RootNodesUnavailableError } from '../errors'; - -// We need to use 'require', because it's not possible with Typescript to import -// function that are exported as 'module.exports = function`, without esModuleInterop -// set to true. -const calculateSlot = require('cluster-key-slot'); +import { spy } from 'sinon'; +import { promiseTimeout } from '../utils'; +import RedisClient from '../client'; describe('Cluster', () => { testUtils.testWithCluster('sendCommand', async cluster => { - await cluster.publish('channel', 'message'); - await cluster.set('a', 'b'); - await cluster.set('a{a}', 'bb'); - await cluster.set('aa', 'bb'); - await cluster.get('aa'); - await cluster.get('aa'); - await cluster.get('aa'); - await cluster.get('aa'); + assert.equal( + await cluster.sendCommand(undefined, true, ['PING']), + 'PONG' + ); + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('isOpen', async cluster => { + assert.equal(cluster.isOpen, true); + await cluster.disconnect(); + assert.equal(cluster.isOpen, false); + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('connect should throw if already connected', async cluster => { + await assert.rejects(cluster.connect()); }, GLOBAL.CLUSTERS.OPEN); testUtils.testWithCluster('multi', async cluster => { @@ -64,54 +68,279 @@ describe('Cluster', () => { }); testUtils.testWithCluster('should handle live resharding', async cluster => { - const key = 'key', + const slot = 12539, + key = 'key', value = 'value'; await cluster.set(key, value); - const slot = calculateSlot(key), - source = cluster.getSlotMaster(slot), - destination = cluster.getMasters().find(node => node.id !== source.id)!; + const importing = cluster.slots[0].master, + migrating = cluster.slots[slot].master, + [ importingClient, migratingClient ] = await Promise.all([ + cluster.nodeClient(importing), + cluster.nodeClient(migrating) + ]); await Promise.all([ - source.client.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, destination.id), - destination.client.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, destination.id) + importingClient.clusterSetSlot(slot, ClusterSlotStates.IMPORTING, migrating.id), + migratingClient.clusterSetSlot(slot, ClusterSlotStates.MIGRATING, importing.id) ]); - // should be able to get the key from the source node using "ASKING" + // should be able to get the key from the migrating node assert.equal( await cluster.get(key), value ); - await Promise.all([ - source.client.migrate( - '127.0.0.1', - (destination.client.options).socket.port, - key, - 0, - 10 - ) - ]); + await migratingClient.migrate( + importing.host, + importing.port, + key, + 0, + 10 + ); - // should be able to get the key from the destination node using the "ASKING" command + // should be able to get the key from the importing node using `ASKING` assert.equal( await cluster.get(key), value ); - await Promise.all( - cluster.getMasters().map(({ client }) => { - return client.clusterSetSlot(slot, ClusterSlotStates.NODE, destination.id); - }) - ); + await Promise.all([ + importingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), + migratingClient.clusterSetSlot(slot, ClusterSlotStates.NODE, importing.id), + ]); - // should handle "MOVED" errors + // should handle `MOVED` errors assert.equal( await cluster.get(key), value ); }, { serverArguments: [], - numberOfNodes: 2 + numberOfMasters: 2 + }); + + testUtils.testWithCluster('getRandomNode should spread the the load evenly', async cluster => { + const totalNodes = cluster.masters.length + cluster.replicas.length, + ids = new Set(); + for (let i = 0; i < totalNodes; i++) { + ids.add(cluster.getRandomNode().id); + } + + assert.equal(ids.size, totalNodes); + }, GLOBAL.CLUSTERS.WITH_REPLICAS); + + testUtils.testWithCluster('getSlotRandomNode should spread the the load evenly', async cluster => { + const totalNodes = 1 + cluster.slots[0].replicas!.length, + ids = new Set(); + for (let i = 0; i < totalNodes; i++) { + ids.add(cluster.getSlotRandomNode(0).id); + } + + assert.equal(ids.size, totalNodes); + }, GLOBAL.CLUSTERS.WITH_REPLICAS); + + testUtils.testWithCluster('cluster topology', async cluster => { + assert.equal(cluster.slots.length, 16384); + const { numberOfMasters, numberOfReplicas } = GLOBAL.CLUSTERS.WITH_REPLICAS; + assert.equal(cluster.shards.length, numberOfMasters); + assert.equal(cluster.masters.length, numberOfMasters); + assert.equal(cluster.replicas.length, numberOfReplicas * numberOfMasters); + assert.equal(cluster.nodeByAddress.size, numberOfMasters + numberOfMasters * numberOfReplicas); + }, GLOBAL.CLUSTERS.WITH_REPLICAS); + + testUtils.testWithCluster('getMasters should be backwards competiable (without `minimizeConnections`)', async cluster => { + const masters = cluster.getMasters(); + assert.ok(Array.isArray(masters)); + for (const master of masters) { + assert.equal(typeof master.id, 'string'); + assert.ok(master.client instanceof RedisClient); + } + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + minimizeConnections: undefined // reset to default + } + }); + + testUtils.testWithCluster('getSlotMaster should be backwards competiable (without `minimizeConnections`)', async cluster => { + const master = cluster.getSlotMaster(0); + assert.equal(typeof master.id, 'string'); + assert.ok(master.client instanceof RedisClient); + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + minimizeConnections: undefined // reset to default + } + }); + + testUtils.testWithCluster('should throw CROSSSLOT error', async cluster => { + await assert.rejects(cluster.mGet(['a', 'b'])); + }, GLOBAL.CLUSTERS.OPEN); + + describe('minimizeConnections', () => { + testUtils.testWithCluster('false', async cluster => { + for (const master of cluster.masters) { + assert.ok(master.client instanceof RedisClient); + } + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + minimizeConnections: false + } + }); + + testUtils.testWithCluster('true', async cluster => { + for (const master of cluster.masters) { + assert.equal(master.client, undefined); + } + }, { + ...GLOBAL.CLUSTERS.OPEN, + clusterConfiguration: { + minimizeConnections: true + } + }); + }); + + describe('PubSub', () => { + testUtils.testWithCluster('subscribe & unsubscribe', async cluster => { + const listener = spy(); + + await cluster.subscribe('channel', listener); + + await Promise.all([ + waitTillBeenCalled(listener), + cluster.publish('channel', 'message') + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + + await cluster.unsubscribe('channel', listener); + + assert.equal(cluster.pubSubNode, undefined); + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => { + const listener = spy(); + + await cluster.pSubscribe('channe*', listener); + + await Promise.all([ + waitTillBeenCalled(listener), + cluster.publish('channel', 'message') + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + + await cluster.pUnsubscribe('channe*', listener); + + assert.equal(cluster.pubSubNode, undefined); + }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('should move listeners when PubSub node disconnects from the cluster', async cluster => { + const listener = spy(); + await cluster.subscribe('channel', listener); + + assert.ok(cluster.pubSubNode); + const [ migrating, importing ] = cluster.masters[0].address === cluster.pubSubNode.address ? + cluster.masters : + [cluster.masters[1], cluster.masters[0]], + [ migratingClient, importingClient ] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); + + const range = cluster.slots[0].master === migrating ? { + key: 'bar', // 5061 + start: 0, + end: 8191 + } : { + key: 'foo', // 12182 + start: 8192, + end: 16383 + }; + + await Promise.all([ + migratingClient.clusterDelSlotsRange(range), + importingClient.clusterDelSlotsRange(range), + importingClient.clusterAddSlotsRange(range) + ]); + + // wait for migrating node to be notified about the new topology + while ((await migratingClient.clusterInfo()).state !== 'ok') { + await promiseTimeout(50); + } + + // make sure to cause `MOVED` error + await cluster.get(range.key); + + await Promise.all([ + cluster.publish('channel', 'message'), + waitTillBeenCalled(listener) + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + }, { + serverArguments: [], + numberOfMasters: 2, + minimumDockerVersion: [7] + }); + + testUtils.testWithCluster('ssubscribe & sunsubscribe', async cluster => { + const listener = spy(); + + await cluster.sSubscribe('channel', listener); + + await Promise.all([ + waitTillBeenCalled(listener), + cluster.sPublish('channel', 'message') + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + + await cluster.sUnsubscribe('channel', listener); + + // 10328 is the slot of `channel` + assert.equal(cluster.slots[10328].master.pubSubClient, undefined); + }, { + ...GLOBAL.CLUSTERS.OPEN, + minimumDockerVersion: [7] + }); + + testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { + const SLOT = 10328, + migrating = cluster.slots[SLOT].master, + importing = cluster.masters.find(master => master !== migrating)!, + [ migratingClient, importingClient ] = await Promise.all([ + cluster.nodeClient(migrating), + cluster.nodeClient(importing) + ]); + + await Promise.all([ + migratingClient.clusterDelSlots(SLOT), + importingClient.clusterDelSlots(SLOT), + importingClient.clusterAddSlots(SLOT) + ]); + + // wait for migrating node to be notified about the new topology + while ((await migratingClient.clusterInfo()).state !== 'ok') { + await promiseTimeout(50); + } + + const listener = spy(); + + // will trigger `MOVED` error + await cluster.sSubscribe('channel', listener); + + await Promise.all([ + waitTillBeenCalled(listener), + cluster.sPublish('channel', 'message') + ]); + + assert.ok(listener.calledOnceWithExactly('message', 'channel')); + }, { + serverArguments: [], + minimumDockerVersion: [7] + }); }); }); diff --git a/packages/client/lib/cluster/index.ts b/packages/client/lib/cluster/index.ts index 6eafdda86c..818930c8c8 100644 --- a/packages/client/lib/cluster/index.ts +++ b/packages/client/lib/cluster/index.ts @@ -1,11 +1,13 @@ import COMMANDS from './commands'; import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisFunctions, RedisModules, RedisExtensions, RedisScript, RedisScripts, RedisCommandSignature, RedisFunction } from '../commands'; import { ClientCommandOptions, RedisClientOptions, RedisClientType, WithFunctions, WithModules, WithScripts } from '../client'; -import RedisClusterSlots, { ClusterNode, NodeAddressMap } from './cluster-slots'; +import RedisClusterSlots, { NodeAddressMap, ShardNode } from './cluster-slots'; import { attachExtensions, transformCommandReply, attachCommands, transformCommandArguments } from '../commander'; import { EventEmitter } from 'events'; import RedisClusterMultiCommand, { InstantiableRedisClusterMultiCommandType, RedisClusterMultiCommandType } from './multi-command'; import { RedisMultiQueuedCommand } from '../multi-command'; +import { PubSubListener } from '../client/pub-sub'; +import { ErrorReply } from '../errors'; export type RedisClusterClientOptions = Omit< RedisClientOptions, @@ -17,10 +19,34 @@ export interface RedisClusterOptions< F extends RedisFunctions = Record, S extends RedisScripts = Record > extends RedisExtensions { + /** + * Should contain details for some of the cluster nodes that the client will use to discover + * the "cluster topology". We recommend including details for at least 3 nodes here. + */ rootNodes: Array; + /** + * Default values used for every client in the cluster. Use this to specify global values, + * for example: ACL credentials, timeouts, TLS configuration etc. + */ defaults?: Partial; + /** + * When `true`, `.connect()` will only discover the cluster topology, without actually connecting to all the nodes. + * Useful for short-term or PubSub-only connections. + */ + minimizeConnections?: boolean; + /** + * When `true`, distribute load by executing readonly commands (such as `GET`, `GEOSEARCH`, etc.) across all cluster nodes. When `false`, only use master nodes. + */ useReplicas?: boolean; + /** + * The maximum number of times a command will be redirected due to `MOVED` or `ASK` errors. + */ maxCommandRedirections?: number; + /** + * Mapping between the addresses in the cluster (see `CLUSTER SHARDS`) and the addresses the client should connect to + * Useful when the cluster is running on another network + * + */ nodeAddressMap?: NodeAddressMap; } @@ -70,14 +96,44 @@ export default class RedisCluster< } readonly #options: RedisClusterOptions; + readonly #slots: RedisClusterSlots; + + get slots() { + return this.#slots.slots; + } + + get shards() { + return this.#slots.shards; + } + + get masters() { + return this.#slots.masters; + } + + get replicas() { + return this.#slots.replicas; + } + + get nodeByAddress() { + return this.#slots.nodeByAddress; + } + + get pubSubNode() { + return this.#slots.pubSubNode; + } + readonly #Multi: InstantiableRedisClusterMultiCommandType; + get isOpen() { + return this.#slots.isOpen; + } + constructor(options: RedisClusterOptions) { super(); this.#options = options; - this.#slots = new RedisClusterSlots(options, err => this.emit('error', err)); + this.#slots = new RedisClusterSlots(options, this.emit.bind(this)); this.#Multi = RedisClusterMultiCommand.extend(options); } @@ -88,7 +144,7 @@ export default class RedisCluster< }); } - async connect(): Promise { + connect() { return this.#slots.connect(); } @@ -188,34 +244,33 @@ export default class RedisCluster< executor: (client: RedisClientType) => Promise ): Promise { const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16; - let client = this.#slots.getClient(firstKey, isReadonly); + let client = await this.#slots.getClient(firstKey, isReadonly); for (let i = 0;; i++) { try { return await executor(client); } catch (err) { - if (++i > maxCommandRedirections || !(err instanceof Error)) { + if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { throw err; } if (err.message.startsWith('ASK')) { const address = err.message.substring(err.message.lastIndexOf(' ') + 1); - if (this.#slots.getNodeByAddress(address)?.client === client) { - await client.asking(); - continue; + let redirectTo = await this.#slots.getMasterByAddress(address); + if (!redirectTo) { + await this.#slots.rediscover(client); + redirectTo = await this.#slots.getMasterByAddress(address); } - await this.#slots.rediscover(client); - const redirectTo = this.#slots.getNodeByAddress(address); if (!redirectTo) { throw new Error(`Cannot find node ${address}`); } - await redirectTo.client.asking(); - client = redirectTo.client; + await redirectTo.asking(); + client = redirectTo; continue; } else if (err.message.startsWith('MOVED')) { await this.#slots.rediscover(client); - client = this.#slots.getClient(firstKey, isReadonly); + client = await this.#slots.getClient(firstKey, isReadonly); continue; } @@ -239,14 +294,94 @@ export default class RedisCluster< multi = this.MULTI; - getMasters(): Array> { - return this.#slots.getMasters(); + async SUBSCRIBE( + channels: string | Array, + listener: PubSubListener, + bufferMode?: T + ) { + return (await this.#slots.getPubSubClient()) + .SUBSCRIBE(channels, listener, bufferMode); + } + + subscribe = this.SUBSCRIBE; + + async UNSUBSCRIBE( + channels?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ) { + return this.#slots.executeUnsubscribeCommand(client => + client.UNSUBSCRIBE(channels, listener, bufferMode) + ); + } + + unsubscribe = this.UNSUBSCRIBE; + + async PSUBSCRIBE( + patterns: string | Array, + listener: PubSubListener, + bufferMode?: T + ) { + return (await this.#slots.getPubSubClient()) + .PSUBSCRIBE(patterns, listener, bufferMode); } - getSlotMaster(slot: number): ClusterNode { - return this.#slots.getSlotMaster(slot); + pSubscribe = this.PSUBSCRIBE; + + async PUNSUBSCRIBE( + patterns?: string | Array, + listener?: PubSubListener, + bufferMode?: T + ) { + return this.#slots.executeUnsubscribeCommand(client => + client.PUNSUBSCRIBE(patterns, listener, bufferMode) + ); } + pUnsubscribe = this.PUNSUBSCRIBE; + + async SSUBSCRIBE( + channels: string | Array, + listener: PubSubListener, + bufferMode?: T + ) { + const maxCommandRedirections = this.#options.maxCommandRedirections ?? 16, + firstChannel = Array.isArray(channels) ? channels[0] : channels; + let client = await this.#slots.getShardedPubSubClient(firstChannel); + for (let i = 0;; i++) { + try { + return await client.SSUBSCRIBE(channels, listener, bufferMode); + } catch (err) { + if (++i > maxCommandRedirections || !(err instanceof ErrorReply)) { + throw err; + } + + if (err.message.startsWith('MOVED')) { + await this.#slots.rediscover(client); + client = await this.#slots.getShardedPubSubClient(firstChannel); + continue; + } + + throw err; + } + } + } + + sSubscribe = this.SSUBSCRIBE; + + SUNSUBSCRIBE( + channels: string | Array, + listener: PubSubListener, + bufferMode?: T + ) { + return this.#slots.executeShardedUnsubscribeCommand( + Array.isArray(channels) ? channels[0] : channels, + client => client.SUNSUBSCRIBE(channels, listener, bufferMode) + ); + } + + sUnsubscribe = this.SUNSUBSCRIBE; + quit(): Promise { return this.#slots.quit(); } @@ -254,6 +389,32 @@ export default class RedisCluster< disconnect(): Promise { return this.#slots.disconnect(); } + + nodeClient(node: ShardNode) { + return this.#slots.nodeClient(node); + } + + getRandomNode() { + return this.#slots.getRandomNode(); + } + + getSlotRandomNode(slot: number) { + return this.#slots.getSlotRandomNode(slot); + } + + /** + * @deprecated use `.masters` instead + */ + getMasters() { + return this.masters; + } + + /** + * @deprecated use `.slots[]` instead + */ + getSlotMaster(slot: number) { + return this.slots[slot].master; + } } attachCommands({ diff --git a/packages/client/lib/commands/CLUSTER_BUMPEPOCH.spec.ts b/packages/client/lib/commands/CLUSTER_BUMPEPOCH.spec.ts index f9d2f5437b..edb68b3b3b 100644 --- a/packages/client/lib/commands/CLUSTER_BUMPEPOCH.spec.ts +++ b/packages/client/lib/commands/CLUSTER_BUMPEPOCH.spec.ts @@ -11,8 +11,9 @@ describe('CLUSTER BUMPEPOCH', () => { }); testUtils.testWithCluster('clusterNode.clusterBumpEpoch', async cluster => { + const client = await cluster.nodeClient(cluster.masters[0]); assert.equal( - typeof await cluster.getSlotMaster(0).client.clusterBumpEpoch(), + typeof await client.clusterBumpEpoch(), 'string' ); }, GLOBAL.SERVERS.OPEN); diff --git a/packages/client/lib/commands/CLUSTER_COUNT-FAILURE-REPORTS.spec.ts b/packages/client/lib/commands/CLUSTER_COUNT-FAILURE-REPORTS.spec.ts index d84687631b..558110d0a2 100644 --- a/packages/client/lib/commands/CLUSTER_COUNT-FAILURE-REPORTS.spec.ts +++ b/packages/client/lib/commands/CLUSTER_COUNT-FAILURE-REPORTS.spec.ts @@ -11,7 +11,7 @@ describe('CLUSTER COUNT-FAILURE-REPORTS', () => { }); testUtils.testWithCluster('clusterNode.clusterCountFailureReports', async cluster => { - const { client } = cluster.getSlotMaster(0); + const client = await cluster.nodeClient(cluster.masters[0]); assert.equal( typeof await client.clusterCountFailureReports( await client.clusterMyId() diff --git a/packages/client/lib/commands/CLUSTER_COUNTKEYSINSLOT.spec.ts b/packages/client/lib/commands/CLUSTER_COUNTKEYSINSLOT.spec.ts index ecaed428cb..27ecbcfffa 100644 --- a/packages/client/lib/commands/CLUSTER_COUNTKEYSINSLOT.spec.ts +++ b/packages/client/lib/commands/CLUSTER_COUNTKEYSINSLOT.spec.ts @@ -11,8 +11,9 @@ describe('CLUSTER COUNTKEYSINSLOT', () => { }); testUtils.testWithCluster('clusterNode.clusterCountKeysInSlot', async cluster => { + const client = await cluster.nodeClient(cluster.masters[0]); assert.equal( - typeof await cluster.getSlotMaster(0).client.clusterCountKeysInSlot(0), + typeof await client.clusterCountKeysInSlot(0), 'number' ); }, GLOBAL.CLUSTERS.OPEN); diff --git a/packages/client/lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts b/packages/client/lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts index 7c15634130..957b7de20c 100644 --- a/packages/client/lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts +++ b/packages/client/lib/commands/CLUSTER_GETKEYSINSLOT.spec.ts @@ -11,7 +11,8 @@ describe('CLUSTER GETKEYSINSLOT', () => { }); testUtils.testWithCluster('clusterNode.clusterGetKeysInSlot', async cluster => { - const reply = await cluster.getSlotMaster(0).client.clusterGetKeysInSlot(0, 1); + const client = await cluster.nodeClient(cluster.masters[0]), + reply = await client.clusterGetKeysInSlot(0, 1); assert.ok(Array.isArray(reply)); for (const item of reply) { assert.equal(typeof item, 'string'); diff --git a/packages/client/lib/commands/CLUSTER_INFO.spec.ts b/packages/client/lib/commands/CLUSTER_INFO.spec.ts index b770ed3361..69d5c4a8c5 100644 --- a/packages/client/lib/commands/CLUSTER_INFO.spec.ts +++ b/packages/client/lib/commands/CLUSTER_INFO.spec.ts @@ -46,8 +46,9 @@ describe('CLUSTER INFO', () => { }); testUtils.testWithCluster('clusterNode.clusterInfo', async cluster => { + const client = await cluster.nodeClient(cluster.masters[0]); assert.notEqual( - await cluster.getSlotMaster(0).client.clusterInfo(), + await client.clusterInfo(), null ); }, GLOBAL.CLUSTERS.OPEN); diff --git a/packages/client/lib/commands/CLUSTER_KEYSLOT.spec.ts b/packages/client/lib/commands/CLUSTER_KEYSLOT.spec.ts index a7a5ab9472..3bbc9f9cb2 100644 --- a/packages/client/lib/commands/CLUSTER_KEYSLOT.spec.ts +++ b/packages/client/lib/commands/CLUSTER_KEYSLOT.spec.ts @@ -11,8 +11,9 @@ describe('CLUSTER KEYSLOT', () => { }); testUtils.testWithCluster('clusterNode.clusterKeySlot', async cluster => { + const client = await cluster.nodeClient(cluster.masters[0]); assert.equal( - typeof await cluster.getSlotMaster(0).client.clusterKeySlot('key'), + typeof await client.clusterKeySlot('key'), 'number' ); }, GLOBAL.CLUSTERS.OPEN); diff --git a/packages/client/lib/commands/CLUSTER_LINKS.spec.ts b/packages/client/lib/commands/CLUSTER_LINKS.spec.ts index a8b1663c3a..982973e8ea 100644 --- a/packages/client/lib/commands/CLUSTER_LINKS.spec.ts +++ b/packages/client/lib/commands/CLUSTER_LINKS.spec.ts @@ -13,7 +13,8 @@ describe('CLUSTER LINKS', () => { }); testUtils.testWithCluster('clusterNode.clusterLinks', async cluster => { - const links = await cluster.getSlotMaster(0).client.clusterLinks(); + const client = await cluster.nodeClient(cluster.masters[0]), + links = await client.clusterLinks(); assert.ok(Array.isArray(links)); for (const link of links) { assert.equal(typeof link.direction, 'string'); diff --git a/packages/client/lib/commands/CLUSTER_MYID.spec.ts b/packages/client/lib/commands/CLUSTER_MYID.spec.ts index 7781c37452..f427d7058e 100644 --- a/packages/client/lib/commands/CLUSTER_MYID.spec.ts +++ b/packages/client/lib/commands/CLUSTER_MYID.spec.ts @@ -11,9 +11,11 @@ describe('CLUSTER MYID', () => { }); testUtils.testWithCluster('clusterNode.clusterMyId', async cluster => { + const [master] = cluster.masters, + client = await cluster.nodeClient(master); assert.equal( - typeof await cluster.getSlotMaster(0).client.clusterMyId(), - 'string' + await client.clusterMyId(), + master.id ); }, GLOBAL.CLUSTERS.OPEN); }); diff --git a/packages/client/lib/commands/CLUSTER_SAVECONFIG.spec.ts b/packages/client/lib/commands/CLUSTER_SAVECONFIG.spec.ts index bcdccd9091..81ba4aa250 100644 --- a/packages/client/lib/commands/CLUSTER_SAVECONFIG.spec.ts +++ b/packages/client/lib/commands/CLUSTER_SAVECONFIG.spec.ts @@ -11,8 +11,9 @@ describe('CLUSTER SAVECONFIG', () => { }); testUtils.testWithCluster('clusterNode.clusterSaveConfig', async cluster => { + const client = await cluster.nodeClient(cluster.masters[0]); assert.equal( - await cluster.getSlotMaster(0).client.clusterSaveConfig(), + await client.clusterSaveConfig(), 'OK' ); }, GLOBAL.CLUSTERS.OPEN); diff --git a/packages/client/lib/commands/CLUSTER_SLOTS.ts b/packages/client/lib/commands/CLUSTER_SLOTS.ts index 7e1f5dcc96..20d9782dd9 100644 --- a/packages/client/lib/commands/CLUSTER_SLOTS.ts +++ b/packages/client/lib/commands/CLUSTER_SLOTS.ts @@ -13,7 +13,7 @@ type ClusterSlotsRawReply = Array<[ ...replicas: Array ]>; -type ClusterSlotsNode = { +export interface ClusterSlotsNode { ip: string; port: number; id: string; diff --git a/packages/client/lib/commands/PING.spec.ts b/packages/client/lib/commands/PING.spec.ts index fae349176d..06cbae43a1 100644 --- a/packages/client/lib/commands/PING.spec.ts +++ b/packages/client/lib/commands/PING.spec.ts @@ -1,8 +1,24 @@ import { strict as assert } from 'assert'; import testUtils, { GLOBAL } from '../test-utils'; -import RedisClient from '../client'; +import { transformArguments } from './PING'; describe('PING', () => { + describe('transformArguments', () => { + it('default', () => { + assert.deepEqual( + transformArguments(), + ['PING'] + ); + }); + + it('with message', () => { + assert.deepEqual( + transformArguments('message'), + ['PING', 'message'] + ); + }); + }); + describe('client.ping', () => { testUtils.testWithClient('string', async client => { assert.equal( @@ -13,7 +29,7 @@ describe('PING', () => { testUtils.testWithClient('buffer', async client => { assert.deepEqual( - await client.ping(RedisClient.commandOptions({ returnBuffers: true })), + await client.ping(client.commandOptions({ returnBuffers: true })), Buffer.from('PONG') ); }, GLOBAL.SERVERS.OPEN); diff --git a/packages/client/lib/commands/PING.ts b/packages/client/lib/commands/PING.ts index 10ab01f7bd..95fa006122 100644 --- a/packages/client/lib/commands/PING.ts +++ b/packages/client/lib/commands/PING.ts @@ -1,7 +1,12 @@ -import { RedisCommandArgument } from '.'; +import { RedisCommandArgument, RedisCommandArguments } from '.'; -export function transformArguments(): Array { - return ['PING']; +export function transformArguments(message?: RedisCommandArgument): RedisCommandArguments { + const args: RedisCommandArguments = ['PING']; + if (message) { + args.push(message); + } + + return args; } export declare function transformReply(): RedisCommandArgument; diff --git a/packages/client/lib/commands/PUBLISH.ts b/packages/client/lib/commands/PUBLISH.ts index 93a8016900..7862a0936c 100644 --- a/packages/client/lib/commands/PUBLISH.ts +++ b/packages/client/lib/commands/PUBLISH.ts @@ -1,5 +1,7 @@ import { RedisCommandArgument, RedisCommandArguments } from '.'; +export const IS_READ_ONLY = true; + export function transformArguments( channel: RedisCommandArgument, message: RedisCommandArgument diff --git a/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.spec.ts b/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.spec.ts new file mode 100644 index 0000000000..1e5f2292b3 --- /dev/null +++ b/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.spec.ts @@ -0,0 +1,30 @@ +import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../test-utils'; +import { transformArguments } from './PUBSUB_SHARDCHANNELS'; + +describe('PUBSUB SHARDCHANNELS', () => { + testUtils.isVersionGreaterThanHook([7]); + + describe('transformArguments', () => { + it('without pattern', () => { + assert.deepEqual( + transformArguments(), + ['PUBSUB', 'SHARDCHANNELS'] + ); + }); + + it('with pattern', () => { + assert.deepEqual( + transformArguments('patter*'), + ['PUBSUB', 'SHARDCHANNELS', 'patter*'] + ); + }); + }); + + testUtils.testWithClient('client.pubSubShardChannels', async client => { + assert.deepEqual( + await client.pubSubShardChannels(), + [] + ); + }, GLOBAL.SERVERS.OPEN); +}); diff --git a/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.ts b/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.ts new file mode 100644 index 0000000000..e998677848 --- /dev/null +++ b/packages/client/lib/commands/PUBSUB_SHARDCHANNELS.ts @@ -0,0 +1,13 @@ +import { RedisCommandArgument, RedisCommandArguments } from '.'; + +export const IS_READ_ONLY = true; + +export function transformArguments( + pattern?: RedisCommandArgument +): RedisCommandArguments { + const args: RedisCommandArguments = ['PUBSUB', 'SHARDCHANNELS']; + if (pattern) args.push(pattern); + return args; +} + +export declare function transformReply(): Array; diff --git a/packages/client/lib/commands/SPUBLISH.spec.ts b/packages/client/lib/commands/SPUBLISH.spec.ts new file mode 100644 index 0000000000..60b6ce2dad --- /dev/null +++ b/packages/client/lib/commands/SPUBLISH.spec.ts @@ -0,0 +1,21 @@ +import { strict as assert } from 'assert'; +import testUtils, { GLOBAL } from '../test-utils'; +import { transformArguments } from './SPUBLISH'; + +describe('SPUBLISH', () => { + testUtils.isVersionGreaterThanHook([7]); + + it('transformArguments', () => { + assert.deepEqual( + transformArguments('channel', 'message'), + ['SPUBLISH', 'channel', 'message'] + ); + }); + + testUtils.testWithClient('client.sPublish', async client => { + assert.equal( + await client.sPublish('channel', 'message'), + 0 + ); + }, GLOBAL.SERVERS.OPEN); +}); diff --git a/packages/client/lib/commands/SPUBLISH.ts b/packages/client/lib/commands/SPUBLISH.ts new file mode 100644 index 0000000000..42a7ab4907 --- /dev/null +++ b/packages/client/lib/commands/SPUBLISH.ts @@ -0,0 +1,14 @@ +import { RedisCommandArgument, RedisCommandArguments } from '.'; + +export const IS_READ_ONLY = true; + +export const FIRST_KEY_INDEX = 1; + +export function transformArguments( + channel: RedisCommandArgument, + message: RedisCommandArgument +): RedisCommandArguments { + return ['SPUBLISH', channel, message]; +} + +export declare function transformReply(): number; diff --git a/packages/client/lib/commands/generic-transformers.ts b/packages/client/lib/commands/generic-transformers.ts index d3a57a9346..5048de9399 100644 --- a/packages/client/lib/commands/generic-transformers.ts +++ b/packages/client/lib/commands/generic-transformers.ts @@ -137,7 +137,6 @@ export function transformSortedSetMemberNullReply( export function transformSortedSetMemberReply( reply: [RedisCommandArgument, RedisCommandArgument] ): ZMember { - return { value: reply[0], score: transformNumberInfinityReply(reply[1]) diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index d2e33b4abf..9032f1dd34 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -3,7 +3,6 @@ import { SinonSpy } from 'sinon'; import { promiseTimeout } from './utils'; export default new TestUtils({ - defaultDockerVersion: '7.0.2', dockerImageName: 'redis', dockerImageVersionArgument: 'redis-version' }); @@ -31,6 +30,14 @@ export const GLOBAL = { password: 'password' } } + }, + WITH_REPLICAS: { + serverArguments: [], + numberOfMasters: 2, + numberOfReplicas: 1, + clusterConfiguration: { + useReplicas: true + } } } }; diff --git a/packages/client/package.json b/packages/client/package.json index 43e7038136..42a512b644 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -1,6 +1,6 @@ { "name": "@redis/client", - "version": "1.4.2", + "version": "1.5.0", "license": "MIT", "main": "./dist/index.js", "types": "./dist/index.d.ts", diff --git a/packages/graph/lib/test-utils.ts b/packages/graph/lib/test-utils.ts index 4ae0e0a269..56c0af56a2 100644 --- a/packages/graph/lib/test-utils.ts +++ b/packages/graph/lib/test-utils.ts @@ -3,8 +3,7 @@ import RedisGraph from '.'; export default new TestUtils({ dockerImageName: 'redislabs/redisgraph', - dockerImageVersionArgument: 'redisgraph-version', - defaultDockerVersion: '2.8.15' + dockerImageVersionArgument: 'redisgraph-version' }); export const GLOBAL = { diff --git a/packages/json/lib/test-utils.ts b/packages/json/lib/test-utils.ts index fa150e4b7d..f4c4e4eb20 100644 --- a/packages/json/lib/test-utils.ts +++ b/packages/json/lib/test-utils.ts @@ -3,8 +3,7 @@ import RedisJSON from '.'; export default new TestUtils({ dockerImageName: 'redislabs/rejson', - dockerImageVersionArgument: 'rejson-version', - defaultDockerVersion: '2.0.9' + dockerImageVersionArgument: 'rejson-version' }); export const GLOBAL = { diff --git a/packages/test-utils/lib/dockers.ts b/packages/test-utils/lib/dockers.ts index 8f0be95b09..a7e1c610ee 100644 --- a/packages/test-utils/lib/dockers.ts +++ b/packages/test-utils/lib/dockers.ts @@ -1,8 +1,8 @@ import { createConnection } from 'net'; import { once } from 'events'; -import { RedisModules, RedisFunctions, RedisScripts } from '@redis/client/dist/lib/commands'; -import RedisClient, { RedisClientType } from '@redis/client/dist/lib/client'; +import RedisClient from '@redis/client/dist/lib/client'; import { promiseTimeout } from '@redis/client/dist/lib/utils'; +import { ClusterSlotsReply } from '@redis/client/dist/lib/commands/CLUSTER_SLOTS'; import * as path from 'path'; import { promisify } from 'util'; import { exec } from 'child_process'; @@ -64,7 +64,7 @@ async function spawnRedisServerDocker({ image, version }: RedisServerDockerConfi } while (await isPortAvailable(port)) { - await promiseTimeout(500); + await promiseTimeout(50); } return { @@ -102,17 +102,65 @@ after(() => { }); export interface RedisClusterDockersConfig extends RedisServerDockerConfig { - numberOfNodes?: number; + numberOfMasters?: number; + numberOfReplicas?: number; } -async function spawnRedisClusterNodeDocker( +async function spawnRedisClusterNodeDockers( dockersConfig: RedisClusterDockersConfig, serverArguments: Array, fromSlot: number, - toSlot: number, - waitForState: boolean, - meetPort?: number -): Promise { + toSlot: number +) { + const range: Array = []; + for (let i = fromSlot; i < toSlot; i++) { + range.push(i); + } + + const master = await spawnRedisClusterNodeDocker( + dockersConfig, + serverArguments + ); + + await master.client.clusterAddSlots(range); + + if (!dockersConfig.numberOfReplicas) return [master]; + + const replicasPromises: Array> = []; + for (let i = 0; i < (dockersConfig.numberOfReplicas ?? 0); i++) { + replicasPromises.push( + spawnRedisClusterNodeDocker(dockersConfig, [ + ...serverArguments, + '--cluster-enabled', + 'yes', + '--cluster-node-timeout', + '5000' + ]).then(async replica => { + await replica.client.clusterMeet('127.0.0.1', master.docker.port); + + while ((await replica.client.clusterSlots()).length === 0) { + await promiseTimeout(50); + } + + await replica.client.clusterReplicate( + await master.client.clusterMyId() + ); + + return replica; + }) + ); + } + + return [ + master, + ...await Promise.all(replicasPromises) + ]; +} + +async function spawnRedisClusterNodeDocker( + dockersConfig: RedisClusterDockersConfig, + serverArguments: Array +) { const docker = await spawnRedisServerDocker(dockersConfig, [ ...serverArguments, '--cluster-enabled', @@ -128,78 +176,64 @@ async function spawnRedisClusterNodeDocker( await client.connect(); - try { - const range = []; - for (let i = fromSlot; i < toSlot; i++) { - range.push(i); - } - - const promises: Array> = [client.clusterAddSlots(range)]; - - if (meetPort) { - promises.push(client.clusterMeet('127.0.0.1', meetPort)); - } - - if (waitForState) { - promises.push(waitForClusterState(client)); - } - - await Promise.all(promises); - - return docker; - } finally { - await client.disconnect(); - } -} - -async function waitForClusterState< - M extends RedisModules, - F extends RedisFunctions, - S extends RedisScripts ->(client: RedisClientType): Promise { - while ((await client.clusterInfo()).state !== 'ok') { - await promiseTimeout(500); - } + return { + docker, + client + }; } const SLOTS = 16384; -async function spawnRedisClusterDockers(dockersConfig: RedisClusterDockersConfig, serverArguments: Array): Promise> { - const numberOfNodes = dockersConfig.numberOfNodes ?? 3, - slotsPerNode = Math.floor(SLOTS / numberOfNodes), - dockers: Array = []; - for (let i = 0; i < numberOfNodes; i++) { +async function spawnRedisClusterDockers( + dockersConfig: RedisClusterDockersConfig, + serverArguments: Array +): Promise> { + const numberOfMasters = dockersConfig.numberOfMasters ?? 2, + slotsPerNode = Math.floor(SLOTS / numberOfMasters), + spawnPromises: Array> = []; + for (let i = 0; i < numberOfMasters; i++) { const fromSlot = i * slotsPerNode, - [ toSlot, waitForState ] = i === numberOfNodes - 1 ? [SLOTS, true] : [fromSlot + slotsPerNode, false]; - dockers.push( - await spawnRedisClusterNodeDocker( + toSlot = i === numberOfMasters - 1 ? SLOTS : fromSlot + slotsPerNode; + spawnPromises.push( + spawnRedisClusterNodeDockers( dockersConfig, serverArguments, fromSlot, - toSlot, - waitForState, - i === 0 ? undefined : dockers[i - 1].port + toSlot ) ); } - const client = RedisClient.create({ - socket: { - port: dockers[0].port - } - }); + const nodes = (await Promise.all(spawnPromises)).flat(), + meetPromises: Array> = []; + for (let i = 1; i < nodes.length; i++) { + meetPromises.push( + nodes[i].client.clusterMeet('127.0.0.1', nodes[0].docker.port) + ); + } - await client.connect(); + await Promise.all(meetPromises); - try { - while ((await client.clusterInfo()).state !== 'ok') { - await promiseTimeout(500); - } - } finally { - await client.disconnect(); + await Promise.all( + nodes.map(async ({ client }) => { + while (totalNodes(await client.clusterSlots()) !== nodes.length) { + await promiseTimeout(50); + } + + return client.disconnect(); + }) + ); + + return nodes.map(({ docker }) => docker); +} + +function totalNodes(slots: ClusterSlotsReply) { + let total = slots.length; + for (const slot of slots) { + total += slot.replicas.length; } - return dockers; + return total; } const RUNNING_CLUSTERS = new Map, ReturnType>(); diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 7b9494c8d6..b9195c5717 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -9,7 +9,7 @@ import { hideBin } from 'yargs/helpers'; interface TestUtilsConfig { dockerImageName: string; dockerImageVersionArgument: string; - defaultDockerVersion: string; + defaultDockerVersion?: string; } interface CommonTestOptions { @@ -33,7 +33,8 @@ interface ClusterTestOptions< > extends CommonTestOptions { serverArguments: Array; clusterConfiguration?: Partial>; - numberOfNodes?: number; + numberOfMasters?: number; + numberOfReplicas?: number; } interface Version { @@ -43,7 +44,7 @@ interface Version { export default class TestUtils { static #parseVersionNumber(version: string): Array { - if (version === 'edge') return [Infinity]; + if (version === 'latest' || version === 'edge') return [Infinity]; const dashIndex = version.indexOf('-'); return (dashIndex === -1 ? version : version.substring(0, dashIndex)) @@ -58,7 +59,7 @@ export default class TestUtils { }); } - static #getVersion(argumentName: string, defaultVersion: string): Version { + static #getVersion(argumentName: string, defaultVersion = 'latest'): Version { return yargs(hideBin(process.argv)) .option(argumentName, { type: 'string', @@ -163,9 +164,13 @@ export default class TestUtils { M extends RedisModules, F extends RedisFunctions, S extends RedisScripts - >(cluster: RedisClusterType): Promise { - await Promise.all( - cluster.getMasters().map(({ client }) => client.flushAll()) + >(cluster: RedisClusterType): Promise { + return Promise.all( + cluster.masters.map(async ({ client }) => { + if (client) { + await (await client).flushAll(); + } + }) ); } @@ -186,7 +191,8 @@ export default class TestUtils { dockersPromise = spawnRedisCluster({ ...dockerImage, - numberOfNodes: options?.numberOfNodes + numberOfMasters: options?.numberOfMasters, + numberOfReplicas: options?.numberOfReplicas }, options.serverArguments); return dockersPromise; }); @@ -197,15 +203,15 @@ export default class TestUtils { const dockers = await dockersPromise, cluster = RedisCluster.create({ - ...options.clusterConfiguration, rootNodes: dockers.map(({ port }) => ({ socket: { port } - })) + })), + minimizeConnections: true, + ...options.clusterConfiguration }); - await cluster.connect(); try { diff --git a/packages/time-series/lib/test-utils.ts b/packages/time-series/lib/test-utils.ts index da88390602..6d534cccce 100644 --- a/packages/time-series/lib/test-utils.ts +++ b/packages/time-series/lib/test-utils.ts @@ -3,8 +3,7 @@ import TimeSeries from '.'; export default new TestUtils({ dockerImageName: 'redislabs/redistimeseries', - dockerImageVersionArgument: 'timeseries-version', - defaultDockerVersion: '1.8.0' + dockerImageVersionArgument: 'timeseries-version' }); export const GLOBAL = {