diff --git a/package-lock.json b/package-lock.json index 426cc09..cf29580 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "10.2.6", "license": "MIT", "dependencies": { + "@azure/web-pubsub-client": "^1.0.0-beta.3", "lib0": "^0.2.42", "simple-peer": "^9.11.0", "y-protocols": "^1.0.6" @@ -43,6 +44,96 @@ "yjs": "^13.6.8" } }, + "node_modules/@azure/abort-controller": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@azure/abort-controller/-/abort-controller-1.1.0.tgz", + "integrity": "sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=12.0.0" + } + }, + "node_modules/@azure/abort-controller/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/core-util": { + "version": "1.6.1", + "resolved": "https://registry.npmjs.org/@azure/core-util/-/core-util-1.6.1.tgz", + "integrity": "sha512-h5taHeySlsV9qxuK64KZxy4iln1BtMYlNt5jbuEFN3UFSAd1EwKg/Gjl5a6tZ/W8t6li3xPnutOx7zbDyXnPmQ==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=16.0.0" + } + }, + "node_modules/@azure/core-util/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/logger": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/@azure/logger/-/logger-1.0.4.tgz", + "integrity": "sha512-ustrPY8MryhloQj7OWGe+HrYx+aoiOxzbXTtgblbV3xwCqpzUK36phH3XNHQKj3EPonyFUuDTfR3qFhTEAuZEg==", + "dependencies": { + "tslib": "^2.2.0" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/logger/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/web-pubsub-client": { + "version": "1.0.0-beta.3", + "resolved": "https://registry.npmjs.org/@azure/web-pubsub-client/-/web-pubsub-client-1.0.0-beta.3.tgz", + "integrity": "sha512-Vp/WOvXU2dbTP49TfPogWJKhYmZYq3rfyeS5mpysAvTaBkkYyu6IjEavsnpx9+tbDNuv1hucjhfhH1zdpS0PAQ==", + "dependencies": { + "@azure/abort-controller": "^1.0.0", + "@azure/core-util": "^1.1.1", + "@azure/logger": "^1.0.0", + "buffer": "^6.0.0", + "tslib": "^2.2.0", + "ws": "^7.4.5" + }, + "engines": { + "node": ">=14.0.0" + } + }, + "node_modules/@azure/web-pubsub-client/node_modules/tslib": { + "version": "2.6.2", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.2.tgz", + "integrity": "sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==" + }, + "node_modules/@azure/web-pubsub-client/node_modules/ws": { + "version": "7.5.9", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.9.tgz", + "integrity": "sha512-F+P9Jil7UiSKSkppIiD94dN07AwvFixvLIj1Og1Rl9GGMuNipJnV9JzjD6XuqmAeiswGvUmNLjr5cFuXwNS77Q==", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/@babel/code-frame": { "version": "7.22.13", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.22.13.tgz", diff --git a/package.json b/package.json index 84b16aa..1b93541 100644 --- a/package.json +++ b/package.json @@ -58,6 +58,7 @@ ] }, "dependencies": { + "@azure/web-pubsub-client": "^1.0.0-beta.3", "lib0": "^0.2.42", "simple-peer": "^9.11.0", "y-protocols": "^1.0.6" diff --git a/src/y-webrtc.js b/src/y-webrtc.js index 91a77c3..63a93a8 100644 --- a/src/y-webrtc.js +++ b/src/y-webrtc.js @@ -1,4 +1,4 @@ -import * as ws from 'lib0/websocket' +// import * as ws from 'lib0/websocket' import * as map from 'lib0/map' import * as error from 'lib0/error' import * as random from 'lib0/random' @@ -20,6 +20,8 @@ import * as awarenessProtocol from 'y-protocols/awareness' import * as cryptoutils from './crypto.js' +import { WebPubSubClient } from "@azure/web-pubsub-client"; + const log = logging.createModuleLogger('y-webrtc') const messageSync = 0 @@ -270,7 +272,7 @@ const announceSignalingInfo = room => { signalingConns.forEach(conn => { // only subscribe if connection is established, otherwise the conn automatically subscribes to all rooms if (conn.connected) { - conn.send({ type: 'subscribe', topics: [room.name] }) + conn.client.joinGroup(room.name) if (room.webrtcConns.size < room.provider.maxConns) { publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId }) } @@ -413,7 +415,7 @@ export class Room { // signal through all available signaling connections signalingConns.forEach(conn => { if (conn.connected) { - conn.send({ type: 'unsubscribe', topics: [this.name] }) + conn.client.leaveGroup(this.name) } }) awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect') @@ -466,96 +468,103 @@ const openRoom = (doc, provider, name, key) => { const publishSignalingMessage = (conn, room, data) => { if (room.key) { cryptoutils.encryptJson(data, room.key).then(data => { - conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) }) + conn.client.sendToGroup(room.name, buffer.toBase64(data), "text") }) } else { - conn.send({ type: 'publish', topic: room.name, data }) + conn.client.sendToGroup(room.name, data, "json") } } -export class SignalingConn extends ws.WebsocketClient { +export class SignalingConn extends Observable { constructor (url) { - super(url) + super() + this.url = url + this.connected = false /** * @type {Set} */ this.providers = new Set() - this.on('connect', () => { - log(`connected (${url})`) - const topics = Array.from(rooms.keys()) - this.send({ type: 'subscribe', topics }) + this.client = new WebPubSubClient(url) + this.client.on('connected', e => { + this.connected = true + log(`connected (${url}) with ID ${e.connectionId}`) + // Join all the groups. + const groups = Array.from(rooms.keys()) + groups.forEach(group => + this.client.joinGroup(group) + ) rooms.forEach(room => publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) ) }) - this.on('message', m => { - switch (m.type) { - case 'publish': { - const roomName = m.topic - const room = rooms.get(roomName) - if (room == null || typeof roomName !== 'string') { - return - } - const execMessage = data => { - const webrtcConns = room.webrtcConns - const peerId = room.peerId - if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { - // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel - return + this.client.on('disconnected', e => log(`disconnect (${url}): ${e.message}`)) + this.client.on('stopped', () => log(`stopped (${url})`)) + // Set an event handler for group messages before connecting, so we don't miss any. + this.client.on('group-message', e => { + const roomName = e.message.group + const room = rooms.get(roomName) + if (room == null || typeof roomName !== 'string') { + return + } + const execMessage = data => { + const webrtcConns = room.webrtcConns + const peerId = room.peerId + if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { + // ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel + return + } + const emitPeerChange = webrtcConns.has(data.from) + ? () => {} + : () => + room.provider.emit('peers', [{ + removed: [], + added: [data.from], + webrtcPeers: Array.from(room.webrtcConns.keys()), + bcPeers: Array.from(room.bcConns) + }]) + switch (data.type) { + case 'announce': + if (webrtcConns.size < room.provider.maxConns) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) + emitPeerChange() } - const emitPeerChange = webrtcConns.has(data.from) - ? () => {} - : () => - room.provider.emit('peers', [{ - removed: [], - added: [data.from], - webrtcPeers: Array.from(room.webrtcConns.keys()), - bcPeers: Array.from(room.bcConns) - }]) - switch (data.type) { - case 'announce': - if (webrtcConns.size < room.provider.maxConns) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) - emitPeerChange() - } - break - case 'signal': - if (data.signal.type === 'offer') { - const existingConn = webrtcConns.get(data.from) - if (existingConn) { - const remoteToken = data.token - const localToken = existingConn.glareToken - if (localToken && localToken > remoteToken) { - log('offer rejected: ', data.from) - return - } - // if we don't reject the offer, we will be accepting it and answering it - existingConn.glareToken = undefined - } + break + case 'signal': + if (data.signal.type === 'offer') { + const existingConn = webrtcConns.get(data.from) + if (existingConn) { + const remoteToken = data.token + const localToken = existingConn.glareToken + if (localToken && localToken > remoteToken) { + log('offer rejected: ', data.from) + return } - if (data.signal.type === 'answer') { - log('offer answered by: ', data.from) - const existingConn = webrtcConns.get(data.from) - existingConn.glareToken = undefined - } - if (data.to === peerId) { - map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) - emitPeerChange() - } - break + // if we don't reject the offer, we will be accepting it and answering it + existingConn.glareToken = undefined + } } - } - if (room.key) { - if (typeof m.data === 'string') { - cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage) + if (data.signal.type === 'answer') { + log('offer answered by: ', data.from) + const existingConn = webrtcConns.get(data.from) + existingConn.glareToken = undefined } - } else { - execMessage(m.data) - } + if (data.to === peerId) { + map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) + emitPeerChange() + } + break + } + } + if (room.key) { + if (typeof e.message.data === 'string') { + cryptoutils.decryptJson(buffer.fromBase64(e.message.data), room.key).then(execMessage) } + } else { + execMessage(e.message.data) } }) - this.on('disconnect', () => log(`disconnect (${url})`)) + // Connect to the signaling server. + this.client.start() } } @@ -648,7 +657,7 @@ export class WebrtcProvider extends Observable { this.signalingConns.forEach(conn => { conn.providers.delete(this) if (conn.providers.size === 0) { - conn.destroy() + conn.client.stop() signalingConns.delete(conn.url) } })