Skip to content

Commit

Permalink
fix(sharded): fix SSUBSCRIBE memory leak with ioredis (#529)
Browse files Browse the repository at this point in the history
This pull request introduces a change to the sharded adapter's `SSUBSCRIBE` logic: Previously, for each dynamic channel/room, a unique listener was added to the client's `smessageBuffer`. This approach led to a large number of listeners (resulting in `MaxListenersExceededWarning`), especially in scenarios with many dynamic channels. Further, listeners were not being removed when unsubscribing, leading to a memory leak.

The new implementation replaces the multiple listeners with a single `smessageBuffer` listener. This listener is registered once and handles all dynamic channels by maintaining specific channel handlers in a `Map`. Listeners are added to this `Map` in `SSUBSCRIBE` and removed from the `Map` in `SUNSUBSCRIBE`.

Related: #528
  • Loading branch information
RoccoC authored Mar 13, 2024
1 parent bd32763 commit 2113e8d
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ function isRedisV4Client(redisClient: any) {
return typeof redisClient.sSubscribe === "function";
}

const kHandlers = Symbol("handlers");

export function SSUBSCRIBE(
redisClient: any,
channel: string,
Expand All @@ -66,13 +68,17 @@ export function SSUBSCRIBE(
if (isRedisV4Client(redisClient)) {
redisClient.sSubscribe(channel, handler, RETURN_BUFFERS);
} else {
if (!redisClient[kHandlers]) {
redisClient[kHandlers] = new Map();
redisClient.on("smessageBuffer", (rawChannel, message) => {
redisClient[kHandlers].get(rawChannel.toString())?.(
message,
rawChannel
);
});
}
redisClient[kHandlers].set(channel, handler);
redisClient.ssubscribe(channel);

redisClient.on("smessageBuffer", (rawChannel, message) => {
if (rawChannel.toString() === channel) {
handler(message, rawChannel);
}
});
}
}

Expand All @@ -81,6 +87,11 @@ export function SUNSUBSCRIBE(redisClient: any, channel: string | string[]) {
redisClient.sUnsubscribe(channel);
} else {
redisClient.sunsubscribe(channel);
if (Array.isArray(channel)) {
channel.forEach((c) => redisClient[kHandlers].delete(c));
} else {
redisClient[kHandlers].delete(channel);
}
}
}

Expand Down

0 comments on commit 2113e8d

Please sign in to comment.