Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeepAlive: adds handling for cases when monitoring may be stopped #578

Merged
merged 5 commits into from
Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@types/p-retry": "^1.0.1",
"@types/retry": "^0.10.2",
"@types/url-join": "^0.8.2",
"@types/ws": "^4.0.1",
"@types/ws": "^5.1.1",
"delay": "^2.0.0",
"eventemitter3": "^3.0.0",
"finity": "^0.5.4",
Expand All @@ -70,7 +70,7 @@
"p-retry": "^1.0.0",
"retry": "^0.10.1",
"url-join": "^4.0.0",
"ws": "^4.1.0"
"ws": "^5.2.0"
},
"devDependencies": {
"@types/chai": "^4.1.2",
Expand Down
91 changes: 69 additions & 22 deletions src/KeepAlive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ export class KeepAlive extends EventEmitter {
*/
private pingTimer?: NodeJS.Timer;

/**
* A timer for when to stop listening for an incoming event that acknowledges the ping (counts as a pong)
*/
private pongTimer?: NodeJS.Timer;

/**
* The message ID of the latest ping sent, or undefined is there hasn't been one sent.
*/
Expand All @@ -60,12 +65,12 @@ export class KeepAlive extends EventEmitter {
/**
* Flag that indicates whether this object is still monitoring.
*/
public isMonitoring?: Boolean;
public isMonitoring: boolean;

/**
* Flag that indicates whether recommend_reconnect event has been emitted and stop() has not been called.
*/
public recommendReconnect?: Boolean;
public recommendReconnect: boolean;

constructor({
clientPingTimeout = 6000,
Expand All @@ -85,6 +90,9 @@ export class KeepAlive extends EventEmitter {
);
}

this.isMonitoring = false;
this.recommendReconnect = false;

// Logging
if (logger !== undefined) {
this.logger = loggerFromLoggingFunc(KeepAlive.loggerName, logger);
Expand All @@ -96,9 +104,11 @@ export class KeepAlive extends EventEmitter {

/**
* Start monitoring the RTMClient. This method should only be called after the client's websocket is already open.
* @param client
* @param client an RTMClient to monitor
*/
public start(client: RTMClient): void {
this.logger.debug('start monitoring');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make these a little less vague so they make more sense in the context of all the other debug logging an app will be doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is okay since each log line is prepended with the object name (e.g. KeepAlive), and in that context, monitoring only really has one meaning.


if (!client.connected) {
throw errorWithCode(
new Error(),
Expand All @@ -118,7 +128,14 @@ export class KeepAlive extends EventEmitter {
* after that.
*/
public stop(): void {
this.logger.debug('stop monitoring');

this.clearPreviousPingTimer();
this.clearPreviousPongTimer();
if (this.client !== undefined) {
this.client.off('outgoing_message', this.setPingTimer);
this.client.off('slack_event', this.attemptAcknowledgePong);
}
this.lastPing = this.client = undefined;
this.recommendReconnect = this.isMonitoring = false;
}
Expand Down Expand Up @@ -154,48 +171,48 @@ export class KeepAlive extends EventEmitter {
private sendPing(): void {
try {
if (this.client === undefined) {
if (!this.isMonitoring) {
// if monitoring stopped before the ping timer fires, its safe to return
this.logger.debug('stopped monitoring before ping timer fired');
return;
}
throw errorWithCode(new Error('no client found'), ErrorCode.KeepAliveInconsistentState);
}
this.logger.debug('ping timer expired, sending ping');
this.client.send('ping')
.then((messageId) => {
if (this.client === undefined) {
if (!this.isMonitoring) {
// if monitoring stopped before the ping is sent, its safe to return
this.logger.debug('stopped monitoring before outgoing ping message was finished');
return;
}
throw errorWithCode(new Error('no client found'), ErrorCode.KeepAliveInconsistentState);
}

this.lastPing = messageId;

const attemptAcknowledgePong = function (this: KeepAlive, _type: string, event: any): void {
if (this.client === undefined) {
throw errorWithCode(new Error('no client found'), ErrorCode.KeepAliveInconsistentState);
}

if (this.lastPing !== undefined && event.reply_to !== undefined && event.reply_to >= this.lastPing) {
// this message is a reply that acks the previous ping, clear the last ping
this.logger.debug('received pong, clearing pong timer');
delete this.lastPing;

// signal that this pong is done being handled
clearTimeout(pongTimer);
this.client.off('slack_event', attemptAcknowledgePong);
}
};

this.logger.debug('setting pong timer');
const pongTimer = setTimeout(() => {

this.pongTimer = setTimeout(() => {
if (this.client === undefined) {
// if monitoring stopped before the pong timer fires, its safe to return
if (!this.isMonitoring) {
this.logger.debug('stopped monitoring before pong timer fired');
return;
}
throw errorWithCode(new Error('no client found'), ErrorCode.KeepAliveInconsistentState);
}
// signal that this pong is done being handled
this.client.off('slack_event', attemptAcknowledgePong);
this.client.off('slack_event', this.attemptAcknowledgePong);

// no pong received to acknowledge the last ping within the serverPongTimeout
this.logger.debug('pong timer expired, recommend reconnect');
this.recommendReconnect = true;
this.emit('recommend_reconnect');
}, this.serverPongTimeout);

this.client.on('slack_event', attemptAcknowledgePong, this);
this.client.on('slack_event', this.attemptAcknowledgePong, this);
})
.catch((error) => {
this.logger.error(`Unhandled error: ${error.message}. Please report to @slack/client package maintainers.`);
Expand All @@ -204,4 +221,34 @@ export class KeepAlive extends EventEmitter {
this.logger.error(`Unhandled error: ${error.message}. Please report to @slack/client package maintainers.`);
}
}

/**
* Clears the pong timer if its set, otherwise this is a noop.
*/
private clearPreviousPongTimer(): void {
if (this.pongTimer !== undefined) {
clearTimeout(this.pongTimer);
}
}

/**
* Determines if a giving incoming event can be treated as an acknowledgement for the outstanding ping, and then
* clears the ping if so.
* @param event any incoming slack event
*/
private attemptAcknowledgePong(_type: string, event: any): void {
if (this.client === undefined) {
throw errorWithCode(new Error('no client found'), ErrorCode.KeepAliveInconsistentState);
}

if (this.lastPing !== undefined && event.reply_to !== undefined && event.reply_to >= this.lastPing) {
// this message is a reply that acks the previous ping, clear the last ping
this.logger.debug('received pong, clearing pong timer');
delete this.lastPing;

// signal that this pong is done being handled
this.clearPreviousPongTimer();
this.client.off('slack_event', this.attemptAcknowledgePong);
}
}
}
14 changes: 9 additions & 5 deletions src/RTMClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ export class RTMClient extends EventEmitter {
*/
private static loggerName = `${pkg.name}:RTMClient`;


/**
* This object's logger instance
*/
Expand Down Expand Up @@ -331,6 +330,11 @@ export class RTMClient extends EventEmitter {
if (this.websocket !== undefined) {
// this will trigger the 'websocket close' event on the state machine, which transitions to clean up
this.websocket.close();

// if the websocket actually is no longer connected, the eventual 'websocket close' event will take a long time,
// because it won't fire until the close handshake completes. in the meantime, stop the keep alive so we don't
// send pings on a dead connection.
this.keepAlive.stop();
}
}, this);

Expand All @@ -350,7 +354,7 @@ export class RTMClient extends EventEmitter {
/**
* Begin an RTM session using the provided options. This method must be called before any messages can
* be sent or received.
* @param options
* @param options arguments for the start method
*/
public start(options?: methods.RTMStartArguments | methods.RTMConnectArguments): void {
// TODO: should this return a Promise<WebAPICallResult>?
Expand Down Expand Up @@ -537,7 +541,7 @@ export class RTMClient extends EventEmitter {

/**
* Set up method for the client's websocket instance. This method will attach event listeners.
* @param url
* @param url the websocket url
*/
private setupWebsocket(url: string): void {
// initialize the websocket
Expand Down Expand Up @@ -575,7 +579,7 @@ export class RTMClient extends EventEmitter {
/**
* `onmessage` handler for the client's websocket. This will parse the payload and dispatch the relevant events for
* each incoming message.
* @param websocketMessage
* @param websocketMessage an incoming message
*/
private onWebsocketMessage({ data }: { data: string }): void {
// v3 legacy
Expand Down Expand Up @@ -673,7 +677,7 @@ export interface RTMWebsocketError extends CodedError {

/**
* A factory to create RTMWebsocketError objects.
* @param original
* @param original the original error
*/
function websocketErrorWithOriginal(original: Error): RTMWebsocketError {
const error = errorWithCode(
Expand Down
3 changes: 0 additions & 3 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ export enum ErrorCode {

/**
* Factory for producing a {@link CodedError} from a generic error
*
* @param error
* @param code
*/
export function errorWithCode(error: Error, code: ErrorCode): CodedError {
const codedError = error as Partial<CodedError>;
Expand Down
4 changes: 2 additions & 2 deletions src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ function isMoreSevere(level: LogLevel, threshold: number): boolean {

/**
* INTERNAL function for transforming an external LoggerFunc type into the internal Logger interface
* @param name
* @param loggingFunc
* @param name logger name assigned by object creating the logger
* @param loggingFunc a function to call with log data
*/
export function loggerFromLoggingFunc(name: string, loggingFunc: LoggingFunc): Logger {
const logger = log.getLogger(name);
Expand Down