Skip to content

Commit

Permalink
socket-mode: do not throw if calling disconnect() and already disco…
Browse files Browse the repository at this point in the history
…nnected, and do not raise `slack_event` if message received is of `type: disconnect`
  • Loading branch information
Fil Maj authored Apr 30, 2024
1 parent 4008057 commit dc48959
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 29 deletions.
10 changes: 4 additions & 6 deletions packages/socket-mode/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@slack/socket-mode",
"version": "1.3.4",
"version": "1.3.5-rc.1",
"description": "Official library for using the Slack Platform's Socket Mode API",
"author": "Slack Technologies, LLC",
"license": "MIT",
Expand Down Expand Up @@ -40,19 +40,17 @@
"build": "npm run build:clean && tsc",
"build:clean": "shx rm -rf ./dist ./coverage ./.nyc_output",
"lint": "eslint --ext .ts src",
"test": "npm run lint && npm run build && nyc mocha --config .mocharc.json src/*.spec.js",
"test": "npm run lint && npm run build && nyc mocha --config .mocharc.json src/*.spec.js && npm run test:integration",
"test:integration": "mocha --config .mocharc.json test/integration.spec.js",
"watch": "npx nodemon --watch 'src' --ext 'ts' --exec npm run build"
},
"dependencies": {
"@slack/logger": "^3.0.0",
"@slack/web-api": "^6.11.2",
"@types/node": ">=12.0.0",
"@types/p-queue": "^2.3.2",
"@types/ws": "^7.4.7",
"eventemitter3": "^3.1.0",
"eventemitter3": "^5",
"finity": "^0.5.4",
"p-cancelable": "^1.1.0",
"p-queue": "^2.4.2",
"ws": "^7.5.3"
},
"devDependencies": {
Expand Down
39 changes: 16 additions & 23 deletions packages/socket-mode/src/SocketModeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ enum Event {
WebSocketOpen = 'websocket open',
WebSocketClose = 'websocket close',
ServerHello = 'server hello',
ServerDisconnectWarning = 'server disconnect warning',
ServerDisconnectOldSocket = 'server disconnect old socket',
ServerExplicitDisconnect = 'server explicit disconnect',
ServerPingsNotReceived = 'server pings not received',
ServerPongsNotReceived = 'server pongs not received',
ExplicitDisconnect = 'explicit disconnect',
ClientExplicitDisconnect = 'client explicit disconnect',
UnableToSocketModeStart = 'unable_to_socket_mode_start',
}

Expand Down Expand Up @@ -170,7 +169,7 @@ export class SocketModeClient extends EventEmitter {
}
});
// Delegate behavior to state machine
this.stateMachine.handle(Event.ExplicitDisconnect);
this.stateMachine.handle(Event.ClientExplicitDisconnect);
});
}

Expand Down Expand Up @@ -202,15 +201,17 @@ export class SocketModeClient extends EventEmitter {
.transitionTo(ConnectingState.Reconnecting).withCondition(this.reconnectingCondition.bind(this))
.transitionTo(ConnectingState.Failed)
.state(ConnectingState.Reconnecting)
.do(async () => {
.do(() => new Promise((res, _rej) => {
// Trying to reconnect after waiting for a bit...
this.numOfConsecutiveReconnectionFailures += 1;
const millisBeforeRetry = this.clientPingTimeoutMillis * this.numOfConsecutiveReconnectionFailures;
this.logger.debug(`Before trying to reconnect, this client will wait for ${millisBeforeRetry} milliseconds`);
setTimeout(() => {
this.emit(ConnectingState.Authenticating);
res(true);
}, millisBeforeRetry);
})
}))
.onSuccess().transitionTo(ConnectingState.Authenticating)
.onFailure().transitionTo(ConnectingState.Failed)
.state(ConnectingState.Authenticated)
.onEnter(this.configureAuthenticatedWebSocket.bind(this))
Expand Down Expand Up @@ -261,6 +262,8 @@ export class SocketModeClient extends EventEmitter {
.initialState(State.Disconnected)
.on(Event.Start)
.transitionTo(State.Connecting)
.on(Event.ClientExplicitDisconnect)
.transitionTo(State.Disconnected)
.state(State.Connecting)
.onEnter(() => {
this.logger.info('Going to establish a new connection to Slack ...');
Expand All @@ -271,7 +274,7 @@ export class SocketModeClient extends EventEmitter {
.on(Event.WebSocketClose)
.transitionTo(State.Reconnecting).withCondition(this.autoReconnectCondition.bind(this))
.transitionTo(State.Disconnecting)
.on(Event.ExplicitDisconnect)
.on(Event.ClientExplicitDisconnect)
.transitionTo(State.Disconnecting)
.on(Event.Failure)
.transitionTo(State.Disconnected)
Expand All @@ -289,7 +292,7 @@ export class SocketModeClient extends EventEmitter {
.withCondition(this.autoReconnectCondition.bind(this))
.withAction(() => this.markCurrentWebSocketAsInactive())
.transitionTo(State.Disconnecting)
.on(Event.ExplicitDisconnect)
.on(Event.ClientExplicitDisconnect)
.transitionTo(State.Disconnecting)
.withAction(() => this.markCurrentWebSocketAsInactive())
.on(Event.ServerPingsNotReceived)
Expand All @@ -298,10 +301,7 @@ export class SocketModeClient extends EventEmitter {
.on(Event.ServerPongsNotReceived)
.transitionTo(State.Reconnecting).withCondition(this.autoReconnectCondition.bind(this))
.transitionTo(State.Disconnecting)
.on(Event.ServerDisconnectWarning)
.transitionTo(State.Reconnecting).withCondition(this.autoReconnectCondition.bind(this))
.transitionTo(State.Disconnecting)
.on(Event.ServerDisconnectOldSocket)
.on(Event.ServerExplicitDisconnect)
.transitionTo(State.Reconnecting).withCondition(this.autoReconnectCondition.bind(this))
.transitionTo(State.Disconnecting)
.onExit(() => {
Expand Down Expand Up @@ -731,17 +731,10 @@ export class SocketModeClient extends EventEmitter {
return;
}

// Open the second WebSocket connection in preparation for the existing WebSocket disconnecting
if (event.type === 'disconnect' && event.reason === 'warning') {
this.logger.debug('Received "disconnect" (warning) message - creating the second connection');
this.stateMachine.handle(Event.ServerDisconnectWarning);
return;
}

// Close the primary WebSocket in favor of secondary WebSocket, assign secondary to primary
if (event.type === 'disconnect' && event.reason === 'refresh_requested') {
this.logger.debug('Received "disconnect" (refresh requested) message - closing the old WebSocket connection');
this.stateMachine.handle(Event.ServerDisconnectOldSocket);
if (event.type === 'disconnect') {
// Refresh the WebSocket connection when prompted by Slack backend
this.logger.debug(`Received "disconnect" (reason: ${event.reason}) message - will ${this.autoReconnectEnabled ? 'attempt reconnect' : 'disconnect (due to autoReconnectEnabled=false)'}`);
this.stateMachine.handle(Event.ServerExplicitDisconnect);
return;
}

Expand Down
245 changes: 245 additions & 0 deletions packages/socket-mode/test/integration.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
const { assert } = require('chai');
const { SocketModeClient} = require('../src/SocketModeClient');
const { LogLevel } = require('../src/logger');
const { Server: WebSocketServer } = require('ws');
const { createServer } = require('http');
const sinon = require('sinon');

const HTTP_PORT = 12345;
const WSS_PORT = 23456;
// Basic HTTP server to 'fake' behaviour of `apps.connections.open` endpoint
let server = null;

// Basic WS server to fake slack WS endpoint
let wss = null;
let exposed_ws_connection = null;

// Socket mode client pointing to the above two posers
let client = null;


const DISCONNECT_REASONS = ['warning', 'refresh_requested', 'too_many_websockets'];

describe('Integration tests with a WebSocket server', () => {
beforeEach(() => {
server = createServer((_req, res) => {
res.writeHead(200, { 'content-type': 'application/json' });
res.end(JSON.stringify({
ok: true,
url: `ws://localhost:${WSS_PORT}/`,
}));
});
server.listen(HTTP_PORT);
wss = new WebSocketServer({ port: WSS_PORT });
wss.on('connection', (ws) => {
ws.on('error', (err) => {
assert.fail(err);
});
// Send `Event.ServerHello`
ws.send(JSON.stringify({type: 'hello'}));
exposed_ws_connection = ws;
});
});
afterEach(() => {
server.close();
server = null;
wss.close();
wss = null;
exposed_ws_connection = null;
client = null;
});
describe('establishing connection, receiving valid messages', () => {
beforeEach(() => {
client = new SocketModeClient({ appToken: 'whatever', logLevel: LogLevel.ERROR, clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`
}});
});
it('connects to a server via `start()` and gracefully disconnects via `disconnect()`', async () => {
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
await client.start();
await connected;
await client.disconnect();
});
it('can call `disconnect()` even if already disconnected without issue', async () => {
await client.disconnect();
});
it('can listen on random event types and receive payload properties', async () => {
client.on('connected', () => {
exposed_ws_connection.send(JSON.stringify({
type: 'integration-test',
envelope_id: 12345,
}));
});
await client.start();
await new Promise((res, _rej) => {
client.on('integration-test', (evt) => {
assert.equal(evt.envelope_id, 12345);
res();
});
});
await client.disconnect();
});
});
describe('catastrophic server behaviour', () => {
beforeEach(() => {
client = new SocketModeClient({ appToken: 'whatever', logLevel: LogLevel.ERROR, clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`
}, clientPingTimeout: 25});
});
it('should retry if retrieving a WSS URL fails', async () => {
// Shut down the main WS-endpoint-retrieval server - we will customize its behaviour for this test
server.close();
let num_attempts = 0;
server = createServer((_req, res) => {
num_attempts += 1;
res.writeHead(200, { 'content-type': 'application/json' });
if (num_attempts < 3) {
res.end(JSON.stringify({
ok: false,
error: "fatal_error",
}));
} else {
res.end(JSON.stringify({
ok: true,
url: `ws://localhost:${WSS_PORT}/`,
}));
}
});
server.listen(HTTP_PORT);
await client.start();
assert.equal(num_attempts, 3);
await client.disconnect();
});
});
describe('failure modes / unexpected messages sent to client', () => {
let loggerSpy = sinon.stub();
const noop = () => {};
beforeEach(() => {
client = new SocketModeClient({ appToken: 'whatever', clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`
}, logger: {
debug: noop,
info: noop,
error: loggerSpy,
getLevel: () => 'error',
}});
});
afterEach(() => {
loggerSpy.resetHistory();
});
it('should error-log that a malformed JSON message was received', async () => {
client.on('connected', () => {
exposed_ws_connection.send('');
});
await client.start();
await sleep(10);
assert.isTrue(loggerSpy.calledWith(sinon.match('Unable to parse an incoming WebSocket message')));
await client.disconnect();
});
});
describe('lifecycle events', () => {
beforeEach(() => {
client = new SocketModeClient({ appToken: 'whatever', logLevel: LogLevel.ERROR, clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`
}});
});
it('raises connecting event during `start()`', async () => {
let raised = false;
client.on('connecting', () => { raised = true; });
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
await client.start();
await connected;
assert.isTrue(raised);
await client.disconnect();
});
it('raises authenticated event during `start()`', async () => {
let raised = false;
client.on('authenticated', () => { raised = true; });
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
await client.start();
await connected;
assert.isTrue(raised);
await client.disconnect();
});
it('raises disconnecting event during `disconnect()`', async () => {
let raised = false;
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
client.on('disconnecting', () => { raised = true; });
await client.start();
await connected;
await client.disconnect();
assert.isTrue(raised);
});
it('raises disconnected event after `disconnect()`', async () => {
let raised = false;
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
client.on('disconnected', () => { raised = true; });
await client.start();
await connected;
await client.disconnect();
assert.isTrue(raised);
});
describe('slack_event', () => {
beforeEach(() => {
// Disable auto reconnect for these tests
client = new SocketModeClient({ appToken: 'whatever', logLevel: LogLevel.ERROR, autoReconnectEnabled: false, clientOptions: {
slackApiUrl: `http://localhost:${HTTP_PORT}/`
}});
});
afterEach(async () => {
await client.disconnect();
});
// These tests check that specific type:disconnect events, of various reasons, sent by Slack backend are not raised as slack_events for apps to consume.
DISCONNECT_REASONS.forEach((reason) => {
it(`should not raise a type:disconnect reason:${reason} message as a slack_event`, async () => {
let raised = false;
client.on('connected', () => {
exposed_ws_connection.send(JSON.stringify({type:'disconnect', reason}));
});
client.on('slack_event', () => {
raised = true;
});
await client.start();
await sleep(10);
assert.isFalse(raised);
});
});
});
describe('including reconnection ability', () => {
it('raises reconnecting event after peer disconnects underlying WS connection', async () => {
const reconnectingWaiter = new Promise((res) => client.on('reconnecting', res));
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
await client.start();
await connected;
// force a WS disconnect from the server
exposed_ws_connection.terminate();
// create another waiter for post-reconnect connected event
const reconnectedWaiter = new Promise((res) => client.on('connected', res));
// if we pass the point where the reconnectingWaiter succeeded, then we have verified the reconnecting event is raised
// and this test can be considered passing. if we time out here, then that is an indication of a failure.
await reconnectingWaiter;
await reconnectedWaiter; // wait for this to ensure we dont raise an unexpected error by calling `disconnect` mid-reconnect.
await client.disconnect();
});
DISCONNECT_REASONS.forEach((reason) => {
it(`should reconnect gracefully if server sends a disconnect (reason: ${reason}) message`, async () => {
const connected = new Promise((res) => client.once('connected', res)); // due to legacy reasons, await start() does not wait for Connected state, so add additional check here for it
await client.start();
await connected;
// force a WS disconnect from the server
exposed_ws_connection.send(JSON.stringify({type:"disconnect", reason}));
// create a waiter for post-reconnect connected event
const reconnectedWaiter = new Promise((res) => client.on('connected', res));
// if we pass the point where the reconnectedWaiter succeeded, then we have verified the reconnection succeeded
// and this test can be considered passing. if we time out here, then that is an indication of a failure.
await reconnectedWaiter;
await client.disconnect();
});
});
});
});
});

function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}

0 comments on commit dc48959

Please sign in to comment.