Skip to content

Commit

Permalink
feat: implement utility methods from Socket.IO v4
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Mar 12, 2021
1 parent fc19812 commit 468c3c8
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 73 deletions.
155 changes: 154 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ enum RequestType {
REMOTE_JOIN = 2,
REMOTE_LEAVE = 3,
REMOTE_DISCONNECT = 4,
REMOTE_FETCH = 5,
}

interface Request {
Expand Down Expand Up @@ -222,6 +223,14 @@ export class RedisAdapter extends Adapter {
break;

case RequestType.REMOTE_JOIN:
if (request.opts) {
const opts = {
rooms: new Set<Room>(request.opts.rooms),
except: new Set<Room>(request.opts.except),
};
return super.addSockets(opts, request.rooms);
}

socket = this.nsp.sockets.get(request.sid);
if (!socket) {
return;
Expand All @@ -237,6 +246,14 @@ export class RedisAdapter extends Adapter {
break;

case RequestType.REMOTE_LEAVE:
if (request.opts) {
const opts = {
rooms: new Set<Room>(request.opts.rooms),
except: new Set<Room>(request.opts.except),
};
return super.delSockets(opts, request.rooms);
}

socket = this.nsp.sockets.get(request.sid);
if (!socket) {
return;
Expand All @@ -252,6 +269,14 @@ export class RedisAdapter extends Adapter {
break;

case RequestType.REMOTE_DISCONNECT:
if (request.opts) {
const opts = {
rooms: new Set<Room>(request.opts.rooms),
except: new Set<Room>(request.opts.except),
};
return super.disconnectSockets(opts, request.close);
}

socket = this.nsp.sockets.get(request.sid);
if (!socket) {
return;
Expand All @@ -266,6 +291,30 @@ export class RedisAdapter extends Adapter {
this.pubClient.publish(this.responseChannel, response);
break;

case RequestType.REMOTE_FETCH:
if (this.requests.has(request.requestId)) {
return;
}

const opts = {
rooms: new Set<Room>(request.opts.rooms),
except: new Set<Room>(request.opts.except),
};
const localSockets = await super.fetchSockets(opts);

response = JSON.stringify({
requestId: request.requestId,
sockets: localSockets.map((socket) => ({
id: socket.id,
handshake: socket.handshake,
rooms: [...socket.rooms],
data: socket.data,
})),
});

this.pubClient.publish(this.responseChannel, response);
break;

default:
debug("ignoring unknown request type: %s", request.type);
}
Expand Down Expand Up @@ -299,12 +348,17 @@ export class RedisAdapter extends Adapter {

switch (request.type) {
case RequestType.SOCKETS:
case RequestType.REMOTE_FETCH:
request.msgCount++;

// ignore if response does not contain 'sockets' key
if (!response.sockets || !Array.isArray(response.sockets)) return;

response.sockets.forEach((s) => request.sockets.add(s));
if (request.type === RequestType.SOCKETS) {
response.sockets.forEach((s) => request.sockets.add(s));
} else {
response.sockets.forEach((s) => request.sockets.push(s));
}

if (request.msgCount === request.numSub) {
clearTimeout(request.timeout);
Expand Down Expand Up @@ -587,6 +641,105 @@ export class RedisAdapter extends Adapter {
});
}

public async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
const localSockets = await super.fetchSockets(opts);

if (opts.flags?.local) {
return localSockets;
}

const numSub = await this.getNumSub();
debug('waiting for %d responses to "fetchSockets" request', numSub);

if (numSub <= 1) {
return localSockets;
}

const requestId = uid2(6);

const request = JSON.stringify({
requestId,
type: RequestType.REMOTE_FETCH,
opts: {
rooms: [...opts.rooms],
except: [...opts.except],
},
});

return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
if (this.requests.has(requestId)) {
reject(
new Error("timeout reached while waiting for fetchSockets response")
);
this.requests.delete(requestId);
}
}, this.requestsTimeout);

this.requests.set(requestId, {
type: RequestType.REMOTE_FETCH,
numSub,
resolve,
timeout,
msgCount: 1,
sockets: localSockets,
});

this.pubClient.publish(this.requestChannel, request);
});
}

public addSockets(opts: BroadcastOptions, rooms: Room[]) {
if (opts.flags?.local) {
return super.addSockets(opts, rooms);
}

const request = JSON.stringify({
type: RequestType.REMOTE_JOIN,
opts: {
rooms: [...opts.rooms],
except: [...opts.except],
},
rooms: [...rooms],
});

this.pubClient.publish(this.requestChannel, request);
}

public delSockets(opts: BroadcastOptions, rooms: Room[]) {
if (opts.flags?.local) {
return super.delSockets(opts, rooms);
}

const request = JSON.stringify({
type: RequestType.REMOTE_LEAVE,
opts: {
rooms: [...opts.rooms],
except: [...opts.except],
},
rooms: [...rooms],
});

this.pubClient.publish(this.requestChannel, request);
}

public disconnectSockets(opts: BroadcastOptions, close: boolean) {
if (opts.flags?.local) {
return super.disconnectSockets(opts, close);
}

const request = JSON.stringify({
type: RequestType.REMOTE_DISCONNECT,
opts: {
rooms: [...opts.rooms],
except: [...opts.except],
},
close,
});

this.pubClient.publish(this.requestChannel, request);
}

/**
* Get the number of subscribers of the request channel
*
Expand Down
Loading

0 comments on commit 468c3c8

Please sign in to comment.