-
Notifications
You must be signed in to change notification settings - Fork 572
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* cleanup websocket receiver * fix 9.3.2-9.3.8; 9.4.2-9.4.8; 10.1.1 * fixup * fixup
- Loading branch information
Showing
1 changed file
with
57 additions
and
132 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,11 +12,11 @@ const { | |
websocketMessageReceived, | ||
utf8Decode, | ||
isControlFrame, | ||
isContinuationFrame, | ||
isTextBinaryFrame | ||
isTextBinaryFrame, | ||
isContinuationFrame | ||
} = require('./util') | ||
const { WebsocketFrameSend } = require('./frame') | ||
const { CloseEvent } = require('./events') | ||
const { closeWebSocketConnection } = require('./connection') | ||
|
||
// This code was influenced by ws released under the MIT license. | ||
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]> | ||
|
@@ -26,6 +26,7 @@ const { CloseEvent } = require('./events') | |
class ByteParser extends Writable { | ||
#buffers = [] | ||
#byteOffset = 0 | ||
#loop = false | ||
|
||
#state = parserStates.INFO | ||
|
||
|
@@ -45,6 +46,7 @@ class ByteParser extends Writable { | |
_write (chunk, _, callback) { | ||
this.#buffers.push(chunk) | ||
this.#byteOffset += chunk.length | ||
this.#loop = true | ||
|
||
this.run(callback) | ||
} | ||
|
@@ -55,7 +57,7 @@ class ByteParser extends Writable { | |
* or not enough bytes are buffered to parse. | ||
*/ | ||
run (callback) { | ||
while (true) { | ||
while (this.#loop) { | ||
if (this.#state === parserStates.INFO) { | ||
// If there aren't enough bytes to parse the payload length, etc. | ||
if (this.#byteOffset < 2) { | ||
|
@@ -67,6 +69,13 @@ class ByteParser extends Writable { | |
const opcode = buffer[0] & 0x0F | ||
const masked = (buffer[1] & 0x80) === 0x80 | ||
|
||
const fragmented = !fin && opcode !== opcodes.CONTINUATION | ||
const payloadLength = buffer[1] & 0x7F | ||
|
||
const rsv1 = buffer[0] & 0x40 | ||
const rsv2 = buffer[0] & 0x20 | ||
const rsv3 = buffer[0] & 0x10 | ||
|
||
if (!isValidOpcode(opcode)) { | ||
failWebsocketConnection(this.ws, 'Invalid opcode received') | ||
return callback() | ||
|
@@ -77,22 +86,16 @@ class ByteParser extends Writable { | |
return callback() | ||
} | ||
|
||
const rsv1 = (buffer[0] & 0x40) !== 0 | ||
const rsv2 = (buffer[0] & 0x20) !== 0 | ||
const rsv3 = (buffer[0] & 0x10) !== 0 | ||
|
||
// MUST be 0 unless an extension is negotiated that defines meanings | ||
// for non-zero values. If a nonzero value is received and none of | ||
// the negotiated extensions defines the meaning of such a nonzero | ||
// value, the receiving endpoint MUST _Fail the WebSocket | ||
// Connection_. | ||
if (rsv1 || rsv2 || rsv3) { | ||
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) { | ||
failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear') | ||
return | ||
} | ||
|
||
const fragmented = !fin && opcode !== opcodes.CONTINUATION | ||
|
||
if (fragmented && !isTextBinaryFrame(opcode)) { | ||
// Only text and binary frames can be fragmented | ||
failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.') | ||
|
@@ -101,39 +104,27 @@ class ByteParser extends Writable { | |
|
||
// If we are already parsing a text/binary frame and do not receive either | ||
// a continuation frame or close frame, fail the connection. | ||
if (isTextBinaryFrame(opcode) && this.#info.opcode !== undefined) { | ||
if (isTextBinaryFrame(opcode) && this.#fragments.length > 0) { | ||
failWebsocketConnection(this.ws, 'Expected continuation frame') | ||
return | ||
} | ||
|
||
const payloadLength = buffer[1] & 0x7F | ||
|
||
if (isControlFrame(opcode)) { | ||
const loop = this.parseControlFrame(callback, { | ||
header: buffer, | ||
opcode, | ||
fragmented, | ||
payloadLength | ||
}) | ||
if (this.#info.fragmented && fragmented) { | ||
// A fragmented frame can't be fragmented itself | ||
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.') | ||
return | ||
} | ||
|
||
if (loop) { | ||
continue | ||
} else { | ||
return | ||
} | ||
} else if (isContinuationFrame(opcode)) { | ||
const loop = this.parseContinuationFrame(callback, { | ||
header: buffer, | ||
fin, | ||
fragmented, | ||
payloadLength | ||
}) | ||
// "All control frames MUST have a payload length of 125 bytes or less | ||
// and MUST NOT be fragmented." | ||
if ((payloadLength > 125 || fragmented) && isControlFrame(opcode)) { | ||
failWebsocketConnection(this.ws, 'Control frame either too large or fragmented') | ||
return | ||
} | ||
|
||
if (loop) { | ||
continue | ||
} else { | ||
return | ||
} | ||
if (isContinuationFrame(opcode) && this.#fragments.length === 0) { | ||
failWebsocketConnection(this.ws, 'Unexpected continuation frame') | ||
return | ||
} | ||
|
||
if (payloadLength <= 125) { | ||
|
@@ -145,16 +136,14 @@ class ByteParser extends Writable { | |
this.#state = parserStates.PAYLOADLENGTH_64 | ||
} | ||
|
||
if (isTextBinaryFrame(opcode)) { | ||
this.#info.binaryType = opcode | ||
} | ||
|
||
this.#info.opcode = opcode | ||
this.#info.masked = masked | ||
this.#info.fin = fin | ||
this.#info.fragmented = fragmented | ||
|
||
if (this.#info.fragmented && payloadLength > 125) { | ||
// A fragmented frame can't be fragmented itself | ||
failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.') | ||
return | ||
} | ||
} else if (this.#state === parserStates.PAYLOADLENGTH_16) { | ||
if (this.#byteOffset < 2) { | ||
return callback() | ||
|
@@ -189,42 +178,40 @@ class ByteParser extends Writable { | |
this.#state = parserStates.READ_DATA | ||
} else if (this.#state === parserStates.READ_DATA) { | ||
if (this.#byteOffset < this.#info.payloadLength) { | ||
// If there is still more data in this chunk that needs to be read | ||
return callback() | ||
} else if (this.#byteOffset >= this.#info.payloadLength) { | ||
const body = this.consume(this.#info.payloadLength) | ||
} | ||
|
||
const body = this.consume(this.#info.payloadLength) | ||
|
||
if (isControlFrame(this.#info.opcode)) { | ||
this.#loop = this.parseControlFrame(body) | ||
} else { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented) { | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.opcode, fullMessage) | ||
this.#info = {} | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
} | ||
|
||
this.#state = parserStates.INFO | ||
} | ||
} | ||
|
||
if (this.#byteOffset === 0 && this.#info.payloadLength !== 0) { | ||
callback() | ||
break | ||
this.#state = parserStates.INFO | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Take n bytes from the buffered Buffers | ||
* @param {number} n | ||
* @returns {Buffer|null} | ||
* @returns {Buffer} | ||
*/ | ||
consume (n) { | ||
if (n > this.#byteOffset) { | ||
return null | ||
throw new Error('Called consume() before buffers satiated.') | ||
} else if (n === 0) { | ||
return emptyBuffer | ||
} | ||
|
@@ -297,40 +284,25 @@ class ByteParser extends Writable { | |
|
||
/** | ||
* Parses control frames. | ||
* @param {Buffer} data | ||
* @param {(err?: Error) => void} callback | ||
* @param {{ opcode: number, fragmented: boolean, payloadLength: number, header: Buffer }} info | ||
* @param {Buffer} body | ||
*/ | ||
parseControlFrame (callback, info) { | ||
assert(!info.fragmented) | ||
|
||
if (info.payloadLength > 125) { | ||
// Control frames can have a payload length of 125 bytes MAX | ||
failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.') | ||
return false | ||
} else if (this.#byteOffset < info.payloadLength) { | ||
this.#buffers.unshift(info.header) | ||
this.#byteOffset += 2 | ||
|
||
callback() | ||
return false | ||
} | ||
|
||
const body = this.consume(info.payloadLength) | ||
parseControlFrame (body) { | ||
const { opcode, payloadLength } = this.#info | ||
|
||
if (info.opcode === opcodes.CLOSE) { | ||
if (info.payloadLength === 1) { | ||
if (opcode === opcodes.CLOSE) { | ||
if (payloadLength === 1) { | ||
failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.') | ||
return | ||
return false | ||
} | ||
|
||
this.#info.closeInfo = this.parseCloseBody(body) | ||
|
||
if (this.#info.closeInfo.error) { | ||
const { code, reason } = this.#info.closeInfo | ||
|
||
callback(new CloseEvent('close', { wasClean: false, reason, code })) | ||
return | ||
closeWebSocketConnection(this.ws, code, reason, reason.length) | ||
failWebsocketConnection(this.ws, reason) | ||
return false | ||
} | ||
|
||
if (this.ws[kSentClose] !== sentCloseFrameState.SENT) { | ||
|
@@ -362,9 +334,8 @@ class ByteParser extends Writable { | |
this.ws[kReceivedClose] = true | ||
|
||
this.end() | ||
|
||
return | ||
} else if (info.opcode === opcodes.PING) { | ||
return false | ||
} else if (opcode === opcodes.PING) { | ||
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in | ||
// response, unless it already received a Close frame. | ||
// A Pong frame sent in response to a Ping frame must have identical | ||
|
@@ -381,12 +352,7 @@ class ByteParser extends Writable { | |
}) | ||
} | ||
} | ||
|
||
if (this.#byteOffset <= 0) { | ||
callback() | ||
return false | ||
} | ||
} else if (info.opcode === opcodes.PONG) { | ||
} else if (opcode === opcodes.PONG) { | ||
// A Pong frame MAY be sent unsolicited. This serves as a | ||
// unidirectional heartbeat. A response to an unsolicited Pong frame is | ||
// not expected. | ||
|
@@ -396,47 +362,6 @@ class ByteParser extends Writable { | |
payload: body | ||
}) | ||
} | ||
|
||
if (this.#byteOffset <= 0) { | ||
callback() | ||
return false | ||
} | ||
} | ||
|
||
return true | ||
} | ||
|
||
/** | ||
* Parses continuation frames. | ||
* @param {Buffer} data | ||
* @param {(err?: Error) => void} callback | ||
* @param {{ fin: boolean, fragmented: boolean, payloadLength: number, header: Buffer }} info | ||
*/ | ||
parseContinuationFrame (callback, info) { | ||
// If we received a continuation frame before we started parsing another frame. | ||
if (this.#info.opcode === undefined) { | ||
failWebsocketConnection(this.ws, 'Received unexpected continuation frame.') | ||
return false | ||
} else if (this.#byteOffset < info.payloadLength) { | ||
this.#buffers.unshift(info.header) | ||
this.#byteOffset += 2 | ||
|
||
callback() | ||
return false | ||
} | ||
|
||
const body = this.consume(info.payloadLength) | ||
this.#fragments.push(body) | ||
|
||
// A fragmented message consists of a single frame with the FIN bit | ||
// clear and an opcode other than 0, followed by zero or more frames | ||
// with the FIN bit clear and the opcode set to 0, and terminated by | ||
// a single frame with the FIN bit set and an opcode of 0. | ||
if (info.fin) { | ||
const message = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.opcode, message) | ||
this.#fragments.length = 0 | ||
this.#info = {} | ||
} | ||
|
||
return true | ||
|