-
Notifications
You must be signed in to change notification settings - Fork 82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for receiving voice and opening up the voice api #386
Add support for receiving voice and opening up the voice api #386
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to my comments, I was wondering whether you can disable receiving voice if you don't need it.
voice/src/main/kotlin/Streams.kt
Outdated
SupervisorJob() + dispatcher + CoroutineName("Voice Connection Incoming Streams") | ||
|
||
// this will be set before it is used as the key is received before the udp connection is even established | ||
internal lateinit var key: ByteArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not thread safe, there is no happens-before relationship between setting and using key
.
-> @Volatile
or atomic
?
voice/src/main/kotlin/Streams.kt
Outdated
private val _ssrcToUser: MutableMap<UInt, Snowflake> = mutableMapOf() | ||
|
||
/** | ||
* A mapping of ssrc to user id. | ||
* This cache is built over time through the [dev.kord.voice.gateway.VoiceGateway]. | ||
*/ | ||
val ssrcToUser: Map<UInt, Snowflake> get() = _ssrcToUser |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not thread safe -> ConcurrentHashMap
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not something we can do easily if we want to go multiplatform. next point should be enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiplatform could use some sort of expect and actual with ConcurrentHashMap
on JVM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still the same problem, we'd have to find different implementations for each platform. The actual mutablemap is private, and no other unsafe operations are being done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, the same would be true for other caches. Problem is that this map could be edited and accessed from any thread, so there is no guarantee for changes to become visible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multiplatform solution is to make the variable atomic and interactions with the map sequential. You can achieve the latter by consuming the flow directly without launching.
voice/src/main/kotlin/Streams.kt
Outdated
if (ssrcToUser[ssrc] == null) { | ||
_ssrcToUser[ssrc] = userId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
computeIfAbsent
(especially with ConcurrentHashMap
)
* Convenience method that will invoke the [consumer] on every event [T] created by [VoiceGateway.events]. | ||
* | ||
* The events are buffered in an [unlimited][Channel.UNLIMITED] [buffer][Flow.buffer] and | ||
* [launched][CoroutineScope.launch] in the supplied [scope], which is [Gateway] by default. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be [VoiceGateway]
instead of [Gateway]
in the documentation comment
) : EventHandler<VoiceEvent>(flow, "UdpInterceptor") { | ||
private var framePollerConfigurationBuilder = AudioFramePollerConfigurationBuilder() | ||
private var udp: DiscordUdpConnection? = null | ||
private var ssrc by Delegates.notNull<UInt>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not thread safe, there is no happens-before relationship between setting and using ssrc
.
-> @Volatile
or atomic
?
} catch (e: Exception) { | ||
/* we're done polling, nothing to worry about */ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the Exception
come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anything that could cause the flow to terminate.
kord follows this same procedure in DefaultGatway and DefaultVoiceGateway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, just me beeing curious
@OptIn(ExperimentalIoApi::class) | ||
private val header = BytePacketBuilder().also { | ||
val header = BytePacketBuilder().also { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
header.copyInto(nonce, 0, 0, RTP_HEADER_LENGTH) | ||
val decrypted = XSalsa20Poly1305Codec.decrypt(data, key, nonce) ?: error("couldn't decrypt audio data") | ||
return DecryptedPacket(sequence, timestamp, ssrc, decrypted) | ||
} | ||
} | ||
|
||
fun asByteReadPacket() = BytePacketBuilder().also { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply
instead of also
?
sequence: UShort, | ||
timestamp: UInt, | ||
ssrc: UInt, | ||
encryptedData: ByteArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be decryptedData
instead of encryptedData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch
private lateinit var socket: ConnectedDatagramSocket | ||
private lateinit var configuration: VoiceUdpConnectionConfiguration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not thread safe, there is no happens-before relationship between setting and using socket
and configuration
(between start
, discoverIp
and send
).
-> @Volatile
or atomic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lateinit socket is something done in Gateway and VoiceGateway, I think it would be best to rework the concurrency structure of each gateway/connection in its own pr, which I think is being done.
i don't mind making configuration atomic, however.
do you have an idea in mind how disabling/enabling receiving voice would look like? |
Maybe a |
41c4300
to
c3537d5
Compare
voice/src/main/kotlin/Streams.kt
Outdated
private val _incomingUserAudioFrames: MutableSharedFlow<Pair<Snowflake, AudioFrame>> = MutableSharedFlow() | ||
|
||
/** | ||
* A flow of incoming [AudioFrame]s mapped to its corresponding user id. Streams for each user are built over time, | ||
* or whenever the [dev.kord.voice.gateway.VoiceGateway] receives a [Speaking] event. | ||
*/ | ||
val incomingUserStreams: SharedFlow<Pair<Snowflake, AudioFrame>> = _incomingUserAudioFrames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomingUserStreams
could also use _incomingUserAudioFrames.asSharedFlow()
to forbid casting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is used a few other places throughout Kord, is there any other benefit to use asSharedFlow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing I know about, this is not a must (but I thought I could mention it).
inline fun <reified T : VoiceEvent> VoiceGateway.on( | ||
scope: CoroutineScope = this, | ||
crossinline consumer: suspend T.() -> Unit | ||
): Job { | ||
return this.events.buffer(Channel.UNLIMITED).filterIsInstance<T>().onEach { | ||
launch { it.runCatching { it.consumer() }.onFailure(voiceGatewayOnLogger::error) } | ||
}.launchIn(scope) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not launch inside onEach
to make sure that changes to the MutableMap
in Streams
are sequential (see @BartArys comment). Other solution would be to not use VoiceGateway.on()
in Streams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should stay the same, as it would be confusing that the VoiceGateway.on functions differently from Gateway.on
Though I now understand what you meant by not using launch, I did not realize it was used here :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connection.voiceGateway.events.buffer(Channel.UNLIMITED).filterIsInstance<Speaking>().onEach { speaking ->
// ...
}.launchIn(this)
in init
for Streams
should do the trick then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmhm
private val _incoming: MutableSharedFlow<ByteReadPacket> = MutableSharedFlow() | ||
override val incoming: SharedFlow<ByteReadPacket> = _incoming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incoming
could also use _incoming.asSharedFlow()
to forbid casting
package dev.kord.voice.handlers | ||
|
||
import dev.kord.common.annotation.KordVoice | ||
import dev.kord.voice.Streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong package, won't compile (should import dev.kord.voice.streams.Streams
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intellij cannot refactor packages apparently, or properly at least :-(
@KordVoice | ||
abstract class Streams { | ||
/** | ||
* An encryption key used for decryption of Discord packets. | ||
*/ | ||
internal abstract var key: ByteArray? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstract class with internal abstract member cannot be inherited outside of voice module
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was a bit too easy i guess :p
025e481
to
42e0ee4
Compare
it seems like discord uses much more of the rtp spec than implemented before, and it differs from voice server to voice server. time to implement that |
b6aee63
to
e3a879d
Compare
there may or may not be a memory leak that may or may not have to be fixed. |
for what reason? |
Seems like there are actually two memory leaks, one major, and one much, much more minor. |
Signed-off-by: Lost <[email protected]>
c0e5c0b
to
47ebd69
Compare
Declaring the version catalog in libs.versions.toml instead of settings.gradle.kts has the advantage that it can be reused in buildSrc. It also introduces a single place to define dependencies and their version. General cleanup of gradle files was also done, including: * using default output for dokka: relying on defaults means less config and maintenance burden * removing DocsTask because it was unused, last usage of this custom task was removed in #367 * getting rid of source set workaround introduced in #386 by moving TweetNaclFast from voice/src/main/kotlin to voice/src/main/java
Declaring the version catalog in libs.versions.toml instead of settings.gradle.kts has the advantage that it can be reused in buildSrc. It also introduces a single place to define dependencies and their version. General cleanup of gradle files was also done, including: * using default output for dokka: relying on defaults means less config and maintenance burden * removing DocsTask because it was unused, last usage of this custom task was removed in kordlib#367 * getting rid of the source set workaround introduced in kordlib#386 by moving TweetNaclFast from voice/src/main/kotlin to voice/src/main/java
Declaring the version catalog in libs.versions.toml instead of settings.gradle.kts has the advantage that it can be reused in buildSrc. It also introduces a single place to define dependencies and their version. General cleanup of gradle files was also done, including: * using default output for dokka: relying on defaults means less config and maintenance burden * removing DocsTask because it was unused, last usage of this custom task was removed in #367 * getting rid of the source set workaround introduced in #386 by moving TweetNaclFast from voice/src/main/kotlin to voice/src/main/java * removing io.codearte.nexus-staging plugin introduced in #154 since the tasks of the plugin were never used (Nexus GUI was used manually instead)
All incoming packets are abstracted away into the
Streams
class where it provides a few different flows in different states of processing.Attempted to open up the api.