From e0e22b23f03af4ed6d04148f6fcab5bbf98afa08 Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Wed, 15 Nov 2023 10:03:22 -0500 Subject: [PATCH 1/3] use Azure Web PubSub as signaling server --- src/y-webrtc.js | 125 ++++++++++++++++++++++++++---------------------- 1 file changed, 67 insertions(+), 58 deletions(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index af4604d..98e4c71 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -1,4 +1,4 @@ -import * as ws from 'lib0/websocket' +// import * as ws from 'lib0/websocket' import * as map from 'lib0/map' import * as error from 'lib0/error' import * as random from 'lib0/random' @@ -20,7 +20,9 @@ import * as awarenessProtocol from 'y-protocols/awareness' import * as cryptoutils from './crypto.js' -const log = logging.createModuleLogger('y-webrtc') +import { WebPubSubClient } from "@azure/web-pubsub-client"; + +const log = logging.createModuleLogger('y-webrtc-azure-webpubsub') const messageSync = 0 const messageQueryAwareness = 3 @@ -265,7 +267,7 @@ const announceSignalingInfo = room => { signalingConns.forEach(conn => { // only subcribe if connection is established, otherwise the conn automatically subscribes to all rooms if (conn.connected) { - conn.send({ type: 'subscribe', topics: [room.name] }) + conn.client.joinGroup(room.name) if (room.webrtcConns.size < room.provider.maxConns) { publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId }) } @@ -408,7 +410,7 @@ export class Room { // signal through all available signaling connections signalingConns.forEach(conn => { if (conn.connected) { - conn.send({ type: 'unsubscribe', topics: [this.name] }) + conn.client.leaveGroup(this.name) } }) awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect') @@ -461,78 +463,85 @@ const openRoom = (doc, provider, name, key) => { const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { - conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) }) + conn.client.sendToGroup(room.name, buffer.toBase64(data), "text") }) } else { - conn.send({ type: 'publish', topic: room.name, data }) + conn.client.sendToGroup(room.name, data, "json") } } -export class SignalingConn extends ws.WebsocketClient { +export class SignalingConn extends Observable { constructor (url) { - super(url) + super() + this.url = url + this.connected = false /** * @type {Set} */ this.providers = new Set() - this.on('connect', () => { - log(`connected (${url})`) - const topics = Array.from(rooms.keys()) - this.send({ type: 'subscribe', topics }) + this.client = new WebPubSubClient(url) + this.client.on('connected', e => { + this.connected = true + log(`connected (${url}) with ID ${e.connectionId}`) + // Join all the groups. + const groups = Array.from(rooms.keys()) + groups.forEach(group => + this.client.joinGroup(group) + ) rooms.forEach(room => publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) ) }) - this.on('message', m => { - switch (m.type) { - case 'publish': { - const roomName = m.topic - const room = rooms.get(roomName) - if (room == null || typeof roomName !== 'string') { - return - } - const execMessage = data => { - const webrtcConns = room.webrtcConns - const peerId = room.peerId - if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { - // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel - return - } - const emitPeerChange = webrtcConns.has(data.from) - ? () => {} - : () => - room.provider.emit('peers', [{ - removed: [], - added: [data.from], - webrtcPeers: Array.from(room.webrtcConns.keys()), - bcPeers: Array.from(room.bcConns) - }]) - switch (data.type) { - case 'announce': - if (webrtcConns.size < room.provider.maxConns) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) - emitPeerChange() - } - break - case 'signal': - if (data.to === peerId) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) - emitPeerChange() - } - break + this.client.on('disconnected', e => log(`disconnect (${url}): ${e.message}`)) + this.client.on('stopped', () => log(`stopped (${url})`)) + // Set an event handler for group messages before connecting, so we don't miss any. + this.client.on('group-message', e => { + const roomName = e.message.group + const room = rooms.get(roomName) + if (room == null || typeof roomName !== 'string') { + return + } + const execMessage = data => { + const webrtcConns = room.webrtcConns + const peerId = room.peerId + if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { + // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel + return + } + const emitPeerChange = webrtcConns.has(data.from) + ? () => {} + : () => + room.provider.emit('peers', [{ + removed: [], + added: [data.from], + webrtcPeers: Array.from(room.webrtcConns.keys()), + bcPeers: Array.from(room.bcConns) + }]) + switch (data.type) { + case 'announce': + if (webrtcConns.size < room.provider.maxConns) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) + emitPeerChange() } - } - if (room.key) { - if (typeof m.data === 'string') { - cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage) + break + case 'signal': + if (data.to === peerId) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) + emitPeerChange() } - } else { - execMessage(m.data) - } + break } } + if (room.key) { + if (typeof e.message.data === 'string') { + cryptoutils.decryptJson(buffer.fromBase64(e.message.data), room.key).then(execMessage) + } + } else { + execMessage(e.message.data) + } }) - this.on('disconnect', () => log(`disconnect (${url})`)) + // Connect to the signaling server. + this.client.start() } } @@ -625,7 +634,7 @@ export class WebrtcProvider extends Observable { this.signalingConns.forEach(conn => { conn.providers.delete(this) if (conn.providers.size === 0) { - conn.destroy() + conn.client.stop() signalingConns.delete(conn.url) } }) From 17b8b05fee6ba466fdac3af6dadf3b6de866dcfe Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Wed, 15 Nov 2023 10:10:13 -0500 Subject: [PATCH 2/3] add azure web pubsub client --- package-lock.json | 139 +++++++++++++++++++++++++++++++++++++++++++++- package.json | 1 + 2 files changed, 138 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 637f18a..89ff2b1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "10.2.5", "license": "MIT", "dependencies": { + "@azure/web-pubsub-client": "^1.0.0-beta.3", "lib0": "^0.2.42", "simple-peer": "^9.11.0", "y-protocols": "^1.0.5" @@ -40,6 +41,76 @@ "ws": "^7.2.0" } }, + "node_modules/@azure/abort-controller": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@azure/abort-controller/-/abort-controller-1.1.0.tgz", + "integrity": "sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/@azure/abort-controller/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/core-util": { + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/@azure/core-util/-/core-util-1.6.1.tgz", + "integrity": "sha512-h5taHeySlsV9qxuK64KZxy4iln1BtMYlNt5jbuEFN3UFSAd1EwKg/Gjl5a6tZ/W8t6li3xPnutOx7zbDyXnPmQ==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=16.0.0" + } + }, + "node_modules/@azure/core-util/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/logger": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz", + "integrity": "sha512-ustrPY8MryhloQj7OWGe+HrYx+aoiOxzbXTtgblbV3xwCqpzUK36phH3XNHQKj3EPonyFUuDTfR3qFhTEAuZEg==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/logger/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/web-pubsub-client": { + "version": "1.0.0-beta.3", + "resolved": "https://registry.npmjs.org/@azure/web-pubsub-client/-/web-pubsub-client-1.0.0-beta.3.tgz", + "integrity": "sha512-Vp/WOvXU2dbTP49TfPogWJKhYmZYq3rfyeS5mpysAvTaBkkYyu6IjEavsnpx9+tbDNuv1hucjhfhH1zdpS0PAQ==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.1.1", + "@azure/logger": "^1.0.0", + "buffer": "^6.0.0", + "tslib": "^2.2.0", + "ws": "^7.4.5" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/web-pubsub-client/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, "node_modules/@babel/code-frame": { "version": "7.18.6", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.18.6.tgz", @@ -3994,7 +4065,6 @@ "version": "7.5.9", "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", - "optional": true, "engines": { "node": ">=8.3.0" }, @@ -4171,6 +4241,72 @@ } }, "dependencies": { + "@azure/abort-controller": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@azure/abort-controller/-/abort-controller-1.1.0.tgz", + "integrity": "sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==", + "requires": { + "tslib": "^2.2.0" + }, + "dependencies": { + "tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + } + } + }, + "@azure/core-util": { + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/@azure/core-util/-/core-util-1.6.1.tgz", + "integrity": "sha512-h5taHeySlsV9qxuK64KZxy4iln1BtMYlNt5jbuEFN3UFSAd1EwKg/Gjl5a6tZ/W8t6li3xPnutOx7zbDyXnPmQ==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "tslib": "^2.2.0" + }, + "dependencies": { + "tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + } + } + }, + "@azure/logger": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz", + "integrity": "sha512-ustrPY8MryhloQj7OWGe+HrYx+aoiOxzbXTtgblbV3xwCqpzUK36phH3XNHQKj3EPonyFUuDTfR3qFhTEAuZEg==", + "requires": { + "tslib": "^2.2.0" + }, + "dependencies": { + "tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + } + } + }, + "@azure/web-pubsub-client": { + "version": "1.0.0-beta.3", + "resolved": "https://registry.npmjs.org/@azure/web-pubsub-client/-/web-pubsub-client-1.0.0-beta.3.tgz", + "integrity": "sha512-Vp/WOvXU2dbTP49TfPogWJKhYmZYq3rfyeS5mpysAvTaBkkYyu6IjEavsnpx9+tbDNuv1hucjhfhH1zdpS0PAQ==", + "requires": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.1.1", + "@azure/logger": "^1.0.0", + "buffer": "^6.0.0", + "tslib": "^2.2.0", + "ws": "^7.4.5" + }, + "dependencies": { + "tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + } + } + }, "@babel/code-frame": { "version": "7.18.6", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.18.6.tgz", @@ -7116,7 +7252,6 @@ "version": "7.5.9", "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", - "optional": true, "requires": {} }, "xtend": { diff --git a/package.json b/package.json index 4efbb8f..f08b5dc 100644 --- a/package.json +++ b/package.json @@ -57,6 +57,7 @@ ] }, "dependencies": { + "@azure/web-pubsub-client": "^1.0.0-beta.3", "lib0": "^0.2.42", "simple-peer": "^9.11.0", "y-protocols": "^1.0.5" From 8be2fc41b5cdf0b9a8e2e79e200297f22b426d1d Mon Sep 17 00:00:00 2001 From: Byron Duvall Date: Wed, 15 Nov 2023 13:29:30 -0500 Subject: [PATCH 3/3] revert to original module logger name --- src/y-webrtc.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 98e4c71..f03bc98 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -22,7 +22,7 @@ import * as cryptoutils from './crypto.js' import { WebPubSubClient } from "@azure/web-pubsub-client"; -const log = logging.createModuleLogger('y-webrtc-azure-webpubsub') +const log = logging.createModuleLogger('y-webrtc') const messageSync = 0 const messageQueryAwareness = 3