-
-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Azure webpubsub #59
Closed
Closed
Azure webpubsub #59
Changes from 2 commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
e0e22b2
use Azure Web PubSub as signaling server
byrond 17b8b05
add azure web pubsub client
byrond 8be2fc4
revert to original module logger name
byrond ca12e7d
Merge tag 'v10.2.6' into azure-webpubsub-integration
byrond 9d3ae7a
Merge pull request #1 from palantirnet/azure-webpubsub-integration
byrond File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
import * as ws from 'lib0/websocket' | ||
// import * as ws from 'lib0/websocket' | ||
import * as map from 'lib0/map' | ||
import * as error from 'lib0/error' | ||
import * as random from 'lib0/random' | ||
|
@@ -20,7 +20,9 @@ import * as awarenessProtocol from 'y-protocols/awareness' | |
|
||
import * as cryptoutils from './crypto.js' | ||
|
||
const log = logging.createModuleLogger('y-webrtc') | ||
import { WebPubSubClient } from "@azure/web-pubsub-client"; | ||
|
||
const log = logging.createModuleLogger('y-webrtc-azure-webpubsub') | ||
|
||
const messageSync = 0 | ||
const messageQueryAwareness = 3 | ||
|
@@ -265,7 +267,7 @@ const announceSignalingInfo = room => { | |
signalingConns.forEach(conn => { | ||
// only subcribe if connection is established, otherwise the conn automatically subscribes to all rooms | ||
if (conn.connected) { | ||
conn.send({ type: 'subscribe', topics: [room.name] }) | ||
conn.client.joinGroup(room.name) | ||
if (room.webrtcConns.size < room.provider.maxConns) { | ||
publishSignalingMessage(conn, room, { type: 'announce', from: room.peerId }) | ||
} | ||
|
@@ -408,7 +410,7 @@ export class Room { | |
// signal through all available signaling connections | ||
signalingConns.forEach(conn => { | ||
if (conn.connected) { | ||
conn.send({ type: 'unsubscribe', topics: [this.name] }) | ||
conn.client.leaveGroup(this.name) | ||
} | ||
}) | ||
awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'disconnect') | ||
|
@@ -461,78 +463,85 @@ const openRoom = (doc, provider, name, key) => { | |
const publishSignalingMessage = (conn, room, data) => { | ||
if (room.key) { | ||
cryptoutils.encryptJson(data, room.key).then(data => { | ||
conn.send({ type: 'publish', topic: room.name, data: buffer.toBase64(data) }) | ||
conn.client.sendToGroup(room.name, buffer.toBase64(data), "text") | ||
}) | ||
} else { | ||
conn.send({ type: 'publish', topic: room.name, data }) | ||
conn.client.sendToGroup(room.name, data, "json") | ||
} | ||
} | ||
|
||
export class SignalingConn extends ws.WebsocketClient { | ||
export class SignalingConn extends Observable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't actually need to extend |
||
constructor (url) { | ||
super(url) | ||
super() | ||
this.url = url | ||
this.connected = false | ||
/** | ||
* @type {Set<WebrtcProvider>} | ||
*/ | ||
this.providers = new Set() | ||
this.on('connect', () => { | ||
log(`connected (${url})`) | ||
const topics = Array.from(rooms.keys()) | ||
this.send({ type: 'subscribe', topics }) | ||
this.client = new WebPubSubClient(url) | ||
this.client.on('connected', e => { | ||
this.connected = true | ||
log(`connected (${url}) with ID ${e.connectionId}`) | ||
// Join all the groups. | ||
const groups = Array.from(rooms.keys()) | ||
groups.forEach(group => | ||
this.client.joinGroup(group) | ||
) | ||
rooms.forEach(room => | ||
publishSignalingMessage(this, room, { type: 'announce', from: room.peerId }) | ||
) | ||
}) | ||
this.on('message', m => { | ||
switch (m.type) { | ||
case 'publish': { | ||
const roomName = m.topic | ||
const room = rooms.get(roomName) | ||
if (room == null || typeof roomName !== 'string') { | ||
return | ||
} | ||
const execMessage = data => { | ||
const webrtcConns = room.webrtcConns | ||
const peerId = room.peerId | ||
if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { | ||
// ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel | ||
return | ||
} | ||
const emitPeerChange = webrtcConns.has(data.from) | ||
? () => {} | ||
: () => | ||
room.provider.emit('peers', [{ | ||
removed: [], | ||
added: [data.from], | ||
webrtcPeers: Array.from(room.webrtcConns.keys()), | ||
bcPeers: Array.from(room.bcConns) | ||
}]) | ||
switch (data.type) { | ||
case 'announce': | ||
if (webrtcConns.size < room.provider.maxConns) { | ||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) | ||
emitPeerChange() | ||
} | ||
break | ||
case 'signal': | ||
if (data.to === peerId) { | ||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) | ||
emitPeerChange() | ||
} | ||
break | ||
this.client.on('disconnected', e => log(`disconnect (${url}): ${e.message}`)) | ||
this.client.on('stopped', () => log(`stopped (${url})`)) | ||
// Set an event handler for group messages before connecting, so we don't miss any. | ||
this.client.on('group-message', e => { | ||
const roomName = e.message.group | ||
const room = rooms.get(roomName) | ||
if (room == null || typeof roomName !== 'string') { | ||
return | ||
} | ||
const execMessage = data => { | ||
const webrtcConns = room.webrtcConns | ||
const peerId = room.peerId | ||
if (data == null || data.from === peerId || (data.to !== undefined && data.to !== peerId) || room.bcConns.has(data.from)) { | ||
// ignore messages that are not addressed to this conn, or from clients that are connected via broadcastchannel | ||
return | ||
} | ||
const emitPeerChange = webrtcConns.has(data.from) | ||
? () => {} | ||
: () => | ||
room.provider.emit('peers', [{ | ||
removed: [], | ||
added: [data.from], | ||
webrtcPeers: Array.from(room.webrtcConns.keys()), | ||
bcPeers: Array.from(room.bcConns) | ||
}]) | ||
switch (data.type) { | ||
case 'announce': | ||
if (webrtcConns.size < room.provider.maxConns) { | ||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, true, data.from, room)) | ||
emitPeerChange() | ||
} | ||
} | ||
if (room.key) { | ||
if (typeof m.data === 'string') { | ||
cryptoutils.decryptJson(buffer.fromBase64(m.data), room.key).then(execMessage) | ||
break | ||
case 'signal': | ||
if (data.to === peerId) { | ||
map.setIfUndefined(webrtcConns, data.from, () => new WebrtcConn(this, false, data.from, room)).peer.signal(data.signal) | ||
emitPeerChange() | ||
} | ||
} else { | ||
execMessage(m.data) | ||
} | ||
break | ||
} | ||
} | ||
if (room.key) { | ||
if (typeof e.message.data === 'string') { | ||
cryptoutils.decryptJson(buffer.fromBase64(e.message.data), room.key).then(execMessage) | ||
} | ||
} else { | ||
execMessage(e.message.data) | ||
} | ||
}) | ||
this.on('disconnect', () => log(`disconnect (${url})`)) | ||
// Connect to the signaling server. | ||
this.client.start() | ||
} | ||
} | ||
|
||
|
@@ -625,7 +634,7 @@ export class WebrtcProvider extends Observable { | |
this.signalingConns.forEach(conn => { | ||
conn.providers.delete(this) | ||
if (conn.providers.size === 0) { | ||
conn.destroy() | ||
conn.client.stop() | ||
signalingConns.delete(conn.url) | ||
} | ||
}) | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this code from a local copy of the module that was renamed to
y-webrtc-azure-webpubsub