Skip to content

Commit

Permalink
allow for different implementations of streams, nop by default
Browse files Browse the repository at this point in the history
  • Loading branch information
lost-illusi0n committed Sep 8, 2021
1 parent 093caba commit 42e0ee4
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 36 deletions.
7 changes: 2 additions & 5 deletions voice/src/main/kotlin/VoiceConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import dev.kord.voice.gateway.*
import dev.kord.voice.handlers.StreamsHandler
import dev.kord.voice.handlers.UdpLifeCycleHandler
import dev.kord.voice.handlers.VoiceUpdateEventHandler
import dev.kord.voice.streams.Streams
import dev.kord.voice.udp.*
import dev.kord.voice.udp.AudioFrameSender
import kotlinx.coroutines.*
Expand Down Expand Up @@ -50,6 +51,7 @@ class VoiceConnection(
val voiceGateway: VoiceGateway,
val udp: VoiceUdpConnection,
var voiceGatewayConfiguration: VoiceGatewayConfiguration,
val streams: Streams,
val audioProvider: AudioProvider,
val frameSender: AudioFrameSender,
val frameInterceptorFactory: (FrameInterceptorContext) -> FrameInterceptor,
Expand All @@ -58,11 +60,6 @@ class VoiceConnection(
override val coroutineContext: CoroutineContext =
SupervisorJob() + voiceDispatcher + CoroutineName("Voice Connection for Guild ${data.guildId.value}")

/**
* A representation of all incoming audio streams through the UDP connection.
*/
val streams: Streams = Streams(this, voiceDispatcher)

init {
// handle voice state/server updates (e.g., a move, disconnect, voice server change, etc.)
VoiceUpdateEventHandler(gateway.events, this)
Expand Down
16 changes: 16 additions & 0 deletions voice/src/main/kotlin/VoiceConnectionBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import dev.kord.common.entity.Snowflake
import dev.kord.gateway.*
import dev.kord.voice.exception.VoiceConnectionInitializationException
import dev.kord.voice.gateway.*
import dev.kord.voice.streams.DefaultStreams
import dev.kord.voice.streams.NOPStreams
import dev.kord.voice.streams.Streams
import dev.kord.voice.udp.*
import dev.kord.voice.udp.DefaultVoiceUdpConnection
import io.ktor.client.*
Expand Down Expand Up @@ -87,6 +90,17 @@ class VoiceConnectionBuilder(
*/
var udp: VoiceUdpConnection? = null

/**
* A flag to control the implementation of [streams]. Set to false by default.
* When set to false, a NOP implementation will be used.
* When set to true, a proper receiving implementation will be used.
*/
var receiveVoice: Boolean = false

/**
* A [Streams] implementation to be used. This will override the [receiveVoice] flag.
*/
var streams: Streams? = null

/**
* A builder to customize the voice connection's underlying [VoiceGateway].
Expand Down Expand Up @@ -133,13 +147,15 @@ class VoiceConnectionBuilder(
val audioProvider = audioProvider ?: EmptyAudioPlayerProvider
val audioSender = audioSender ?: DefaultAudioFrameSender(DefaultAudioFrameSenderData(udp, defaultDispatcher))
val frameInterceptorFactory = frameInterceptorFactory ?: { DefaultFrameInterceptor(it) }
val streams = streams ?: if(receiveVoice) DefaultStreams(voiceGateway, udp, defaultDispatcher) else NOPStreams

return VoiceConnection(
voiceConnectionData,
gateway,
voiceGateway,
udp,
initialGatewayConfiguration,
streams,
audioProvider,
audioSender,
frameInterceptorFactory,
Expand Down
4 changes: 2 additions & 2 deletions voice/src/main/kotlin/handlers/StreamsHandler.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dev.kord.voice.handlers

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.Streams
import dev.kord.voice.gateway.*
import dev.kord.voice.gateway.handler.Handler
import dev.kord.voice.streams.Streams
import kotlinx.coroutines.flow.Flow

@OptIn(KordVoice::class)
Expand All @@ -14,7 +14,7 @@ internal class StreamsHandler(
@OptIn(ExperimentalUnsignedTypes::class)
override fun start() {
on<SessionDescription> {
streams.key.value = it.secretKey.toUByteArray().toByteArray()
streams.key = it.secretKey.toUByteArray().toByteArray()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package dev.kord.voice
package dev.kord.voice.streams

import dev.kord.common.annotation.KordVoice
import dev.kord.common.entity.Snowflake
import dev.kord.voice.AudioFrame
import dev.kord.voice.gateway.Speaking
import dev.kord.voice.gateway.on
import dev.kord.voice.gateway.VoiceGateway
import dev.kord.voice.udp.AudioPacket
import dev.kord.voice.udp.VoiceUdpConnection
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
Expand All @@ -17,48 +19,35 @@ import kotlinx.coroutines.flow.*
import kotlin.coroutines.CoroutineContext

@KordVoice
class Streams(
connection: VoiceConnection,
class DefaultStreams(
voiceGateway: VoiceGateway,
udp: VoiceUdpConnection,
dispatcher: CoroutineDispatcher
) : CoroutineScope {
) : Streams, CoroutineScope {
override val coroutineContext: CoroutineContext =
SupervisorJob() + dispatcher + CoroutineName("Voice Connection Incoming Streams")

internal val key: AtomicRef<ByteArray?> = atomic(null)
override var key: ByteArray? by atomic(null)

/**
* A flow of all incoming [dev.kord.voice.udp.AudioPacket.DecryptedPacket]s through the UDP connection.
*/
val incomingAudioPackets: SharedFlow<AudioPacket.DecryptedPacket> =
connection.udp
.incoming
override val incomingAudioPackets: SharedFlow<AudioPacket.DecryptedPacket> =
udp.incoming
.mapNotNull(AudioPacket::encryptedFrom)
.map { it.decrypt(key.value!!) }
.map { it.decrypt(key!!) }
.shareIn(this, SharingStarted.Lazily)

/**
* A flow of all incoming [AudioFrame]s mapped to their ssrc.
*/
val incomingAudioFrames get() = incomingAudioPackets.map { it.ssrc to AudioFrame.fromData(it.data)!! }
override val incomingAudioFrames: Flow<Pair<UInt, AudioFrame>>
get() = incomingAudioPackets.map { it.ssrc to AudioFrame.fromData(it.data)!! }

private val _incomingUserAudioFrames: MutableSharedFlow<Pair<Snowflake, AudioFrame>> = MutableSharedFlow()

/**
* A flow of incoming [AudioFrame]s mapped to its corresponding user id. Streams for each user are built over time,
* or whenever the [dev.kord.voice.gateway.VoiceGateway] receives a [Speaking] event.
*/
val incomingUserStreams: SharedFlow<Pair<Snowflake, AudioFrame>> = _incomingUserAudioFrames
override val incomingUserStreams: SharedFlow<Pair<Snowflake, AudioFrame>> = _incomingUserAudioFrames

private val _ssrcToUser: AtomicRef<MutableMap<UInt, Snowflake>> = atomic(mutableMapOf())

/**
* A mapping of ssrc to user id.
* This cache is built over time through the [dev.kord.voice.gateway.VoiceGateway].
*/
val ssrcToUser: Map<UInt, Snowflake> by _ssrcToUser
override val ssrcToUser: Map<UInt, Snowflake> by _ssrcToUser

init {
connection.voiceGateway.events
voiceGateway.events
.filterIsInstance<Speaking>()
.buffer(Channel.UNLIMITED)
.onEach { speaking ->
Expand All @@ -68,7 +57,7 @@ class Streams(
.filter { (ssrc, _) -> speaking.ssrc == ssrc }
.map { (_, frame) -> speaking.userId to frame }
.onEach { value -> _incomingUserAudioFrames.emit(value) }
.launchIn(this@Streams)
.launchIn(this)

speaking.userId
}
Expand Down
25 changes: 25 additions & 0 deletions voice/src/main/kotlin/streams/NOPStreams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package dev.kord.voice.streams

import dev.kord.common.annotation.KordVoice
import dev.kord.common.entity.Snowflake
import dev.kord.voice.AudioFrame
import dev.kord.voice.udp.AudioPacket
import kotlinx.coroutines.flow.Flow

@KordVoice
object NOPStreams : Streams {
override var key: ByteArray? = null

override val incomingAudioPackets: Flow<AudioPacket.DecryptedPacket>
get() = nopStreamsException()
override val incomingAudioFrames: Flow<Pair<UInt, AudioFrame>>
get() = nopStreamsException()
override val incomingUserStreams: Flow<Pair<Snowflake, AudioFrame>>
get() = nopStreamsException()
override val ssrcToUser: Map<UInt, Snowflake>
get() = nopStreamsException()

@Suppress("NOTHING_TO_INLINE")
private inline fun nopStreamsException(): Nothing =
throw NotImplementedError("NOP implementation being used, try to enable voice receiving.")
}
40 changes: 40 additions & 0 deletions voice/src/main/kotlin/streams/Streams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package dev.kord.voice.streams

import dev.kord.common.annotation.KordVoice
import dev.kord.common.entity.Snowflake
import dev.kord.voice.AudioFrame
import dev.kord.voice.udp.AudioPacket
import kotlinx.coroutines.flow.Flow


/**
* A representation of receiving voice through Discord and different stages of processing.
*/
@KordVoice
interface Streams {
/**
* An encryption key used for decryption of Discord packets.
*/
var key: ByteArray?

/**
* A flow of all incoming [dev.kord.voice.udp.AudioPacket.DecryptedPacket]s through the UDP connection.
*/
val incomingAudioPackets: Flow<AudioPacket.DecryptedPacket>

/**
* A flow of all incoming [AudioFrame]s mapped to their [ssrc][UInt].
*/
val incomingAudioFrames: Flow<Pair<UInt, AudioFrame>>

/**
* A flow of all incoming [AudioFrame]s mapped to their [userId][Snowflake].
* Streams for every user should be built over time and will not be immediately available.
*/
val incomingUserStreams: Flow<Pair<Snowflake, AudioFrame>>

/**
* A map of [ssrc][UInt]s to their corresponding [userId][Snowflake].
*/
val ssrcToUser: Map<UInt, Snowflake>
}

0 comments on commit 42e0ee4

Please sign in to comment.