Skip to content

Commit

Permalink
websocket: use linkedlist instead of Set
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx committed May 21, 2024
1 parent af3379f commit cc4f3de
Showing 1 changed file with 82 additions and 44 deletions.
126 changes: 82 additions & 44 deletions lib/web/websocket/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,33 @@
const { WebsocketFrameSend } = require('./frame')
const { opcodes, sendHints } = require('./constants')

/** @type {Uint8Array} */
/** @type {typeof Uint8Array} */
const FastBuffer = Buffer[Symbol.species]

class SendQueue {
#queued = new Set()
#size = 0
/**
* @typedef {object} SendQueueNode
* @property {SendQueueNode | null} next
* @property {Promise<void> | null} promise
* @property {((...args: any[]) => any)} callback
* @property {Buffer | null} frame
*/

/** @type {import('net').Socket} */
class SendQueue {
/**
* @type {SendQueueNode | null}
*/
#head = null
/**
* @type {SendQueueNode | null}
*/
#tail = null

/**
* @type {boolean}
*/
#running = false

/** @type {import('node:net').Socket} */
#socket

constructor (socket) {
Expand All @@ -19,66 +38,85 @@ class SendQueue {

add (item, cb, hint) {
if (hint !== sendHints.blob) {
const data = clone(item, hint)

if (this.#size === 0) {
this.#dispatch(data, cb, hint)
const frame = createFrame(item, hint)
if (!this.#running) {
// fast-path
this.#socket.write(frame, cb)
} else {
this.#queued.add([data, cb, true, hint])
this.#size++

this.#run()
/** @type {SendQueueNode} */
const node = {
next: null,
promise: null,
callback: cb,
frame
}
if (this.#tail !== null) {
this.#tail.next = node
}
this.#tail = node
}

return
}

const promise = item.arrayBuffer()
const queue = [null, cb, false, hint]
promise.then((ab) => {
queue[0] = clone(ab, hint)
queue[2] = true

this.#run()
})

this.#queued.add(queue)
this.#size++
}

#run () {
for (const queued of this.#queued) {
const [data, cb, done, hint] = queued
/** @type {SendQueueNode} */
const node = {
next: null,
promise: item.arrayBuffer().then((ab) => {
node.promise = null
node.frame = createFrame(ab, hint)
}),
callback: cb,
frame: null
}

if (!done) return
if (this.#tail === null) {
this.#tail = node
}

this.#queued.delete(queued)
this.#size--
if (this.#head === null) {
this.#head = node
}

this.#dispatch(data, cb, hint)
if (!this.#running) {
this.#run()
}
}

#dispatch (data, cb, hint) {
const frame = new WebsocketFrameSend()
const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY

frame.frameData = data
const buffer = frame.createFrame(opcode)

this.#socket.write(buffer, cb)
async #run () {
this.#running = true
/** @type {SendQueueNode | null} */
let node = this.#head
while (node !== null) {
// wait pending promise
if (node.promise !== null) {
await node.promise
}
// write
this.#socket.write(node.frame, node.callback)
// cleanup
node.callback = node.frame = null
// set next
node = node.next
}
this.#head = null
this.#tail = null
this.#running = false
}
}

function clone (data, hint) {
function createFrame (data, hint) {
return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY)
}

function toBuffer (data, hint) {
switch (hint) {
case sendHints.string:
return Buffer.from(data)
case sendHints.arrayBuffer:
case sendHints.blob:
return new FastBuffer(data)
case sendHints.typedArray:
return Buffer.copyBytesFrom(data)
return new FastBuffer(data.buffer, data.byteOffset, data.byteLength)
}
}

Expand Down

0 comments on commit cc4f3de

Please sign in to comment.