Skip to content

Commit

Permalink
better concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
lost-illusi0n committed Sep 7, 2021
1 parent bc16224 commit 41c4300
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
39 changes: 22 additions & 17 deletions voice/src/main/kotlin/Streams.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import dev.kord.common.entity.Snowflake
import dev.kord.voice.gateway.Speaking
import dev.kord.voice.gateway.on
import dev.kord.voice.udp.AudioPacket
import kotlinx.atomicfu.AtomicArray
import kotlinx.coroutines.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.*
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext

@KordVoice
Expand All @@ -19,8 +23,7 @@ class Streams(
override val coroutineContext: CoroutineContext =
SupervisorJob() + dispatcher + CoroutineName("Voice Connection Incoming Streams")

// this will be set before it is used as the key is received before the udp connection is even established
internal val key: AtomicReference<ByteArray> = AtomicReference()
internal val key: AtomicRef<ByteArray?> = atomic(null)

/**
* A flow of all incoming [dev.kord.voice.udp.AudioPacket.DecryptedPacket]s through the UDP connection.
Expand All @@ -29,7 +32,7 @@ class Streams(
connection.udp
.incoming
.mapNotNull(AudioPacket::encryptedFrom)
.map { it.decrypt(key.get()) }
.map { it.decrypt(key.value!!) }
.shareIn(this, SharingStarted.Lazily)

/**
Expand All @@ -45,26 +48,28 @@ class Streams(
*/
val incomingUserStreams: SharedFlow<Pair<Snowflake, AudioFrame>> = _incomingUserAudioFrames

private val _ssrcToUser: MutableMap<UInt, Snowflake> = mutableMapOf()
private val _ssrcToUser: AtomicRef<MutableMap<UInt, Snowflake>> = atomic(mutableMapOf())

/**
* A mapping of ssrc to user id.
* This cache is built over time through the [dev.kord.voice.gateway.VoiceGateway].
*/
val ssrcToUser: Map<UInt, Snowflake> get() = _ssrcToUser
val ssrcToUser: Map<UInt, Snowflake> by _ssrcToUser

init {
connection.voiceGateway.on<Speaking> {
_ssrcToUser.computeIfAbsent(ssrc) {
launch {
_incomingUserAudioFrames.emitAll(
incomingAudioFrames
.filter { (ssrc, _) -> ssrc == this@on.ssrc }
.map { (_, frame) -> userId to frame }
)
connection.voiceGateway.on<Speaking>(scope = this) {
_ssrcToUser.update {
it.computeIfAbsent(ssrc) {
incomingAudioFrames
.filter { (ssrc, _) -> ssrc == this@on.ssrc }
.map { (_, frame) -> userId to frame }
.onEach { _incomingUserAudioFrames.emit(it) }
.launchIn(this@Streams)

userId
}

userId
it
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import dev.kord.voice.gateway.VoiceEvent
import dev.kord.voice.udp.AudioFrameSenderConfiguration
import dev.kord.voice.udp.VoiceUdpConnectionConfiguration
import io.ktor.util.network.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.flow.Flow
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicReference
import kotlin.properties.Delegates

private val udpLifeCycleLogger = KotlinLogging.logger { }

Expand All @@ -25,12 +25,12 @@ internal class UdpLifeCycleHandler(
flow: Flow<VoiceEvent>,
private val connection: VoiceConnection
) : EventHandler<VoiceEvent>(flow, "UdpInterceptor") {
private val ssrc: AtomicReference<UInt> = AtomicReference()
private val ssrc: AtomicRef<UInt?> = atomic(null)

@OptIn(ExperimentalUnsignedTypes::class)
override fun start() {
on<Ready> {
this.ssrc.set(it.ssrc)
this.ssrc.value = it.ssrc

connection.udp.start(VoiceUdpConnectionConfiguration(NetworkAddress(it.ip, it.port), it.ssrc))

Expand All @@ -53,7 +53,7 @@ internal class UdpLifeCycleHandler(
on<SessionDescription> {
with(connection) {
val config = AudioFrameSenderConfiguration(
ssrc = ssrc.get(),
ssrc = ssrc.value!!,
key = it.secretKey.toUByteArray().toByteArray(),
provider = audioProvider,
baseFrameInterceptorContext = FrameInterceptorContextBuilder(gateway, voiceGateway),
Expand Down
9 changes: 5 additions & 4 deletions voice/src/main/kotlin/udp/DefaultVoiceUdpConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.network.*
import io.ktor.utils.io.core.*
import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import mu.KotlinLogging
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext

private val udpConnectionLogger = KotlinLogging.logger { }
Expand All @@ -23,13 +24,13 @@ internal class DefaultVoiceUdpConnection(
SupervisorJob() + data.dispatcher + CoroutineName("Discord Voice UDP Connection")

private lateinit var socket: ConnectedDatagramSocket
private val configuration: AtomicReference<VoiceUdpConnectionConfiguration> = AtomicReference()
private val configuration: AtomicRef<VoiceUdpConnectionConfiguration?> = atomic(null)

private val _incoming: MutableSharedFlow<ByteReadPacket> = MutableSharedFlow()
override val incoming: SharedFlow<ByteReadPacket> = _incoming

override suspend fun start(configuration: VoiceUdpConnectionConfiguration) {
this.configuration.set(configuration)
this.configuration.value = configuration
if (::socket.isInitialized) withContext(Dispatchers.IO) { socket.close() }

socket = aSocket(ActorSelectorManager(coroutineContext)).udp().connect(configuration.server)
Expand All @@ -44,7 +45,7 @@ internal class DefaultVoiceUdpConnection(
udpConnectionLogger.trace { "discovering ip" }

send(buildPacket {
writeUInt(configuration.get().ssrc)
writeUInt(configuration.value!!.ssrc)
writeFully(ByteArray(66))
})

Expand Down

0 comments on commit 41c4300

Please sign in to comment.