Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frame Interceptor Rewrite #427

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions voice/src/main/kotlin/DefaultFrameInterceptor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dev.kord.voice

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.gateway.SendSpeaking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach

@KordVoice
/**
* Data that is used to configure for the lifetime of a [DefaultFrameInterceptor].
*
* @param speakingState the [SpeakingFlags] to be sent when there audio being sent. By default, it is [microphone-only][SpeakingFlag.Microphone].
*/
public data class DefaultFrameInterceptorData(
val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone }
)

private const val FRAMES_OF_SILENCE_TO_PLAY = 5

@KordVoice
/**
* The default implementation for [FrameInterceptor].
* Any custom implementation should extend this and call the super [intercept] method, or else
* the speaking flags will not be sent!
*
* @param data the data to configure this instance with.
*/
public class DefaultFrameInterceptor(private val data: DefaultFrameInterceptorData = DefaultFrameInterceptorData()) :
FrameInterceptor {
override fun Flow<AudioFrame?>.intercept(configuration: FrameInterceptorConfiguration): Flow<AudioFrame?> {
var framesOfSilence = 5
var isSpeaking = false

suspend fun startSpeaking() {
isSpeaking = true
configuration.voiceGateway.send(SendSpeaking(data.speakingState, 0, configuration.ssrc))
}

suspend fun stopSpeaking() {
isSpeaking = false
configuration.voiceGateway.send(SendSpeaking(SpeakingFlags(0), 0, configuration.ssrc))
}

return map { frame ->
when (framesOfSilence) {
0 -> frame
else -> frame ?: AudioFrame.SILENCE
}
}.onEach { frame ->
if (frame != null && !isSpeaking) {
startSpeaking()
} else if (frame == null) {
if (--framesOfSilence == 0)
stopSpeaking()
} else {
framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY
}
}
}
}
74 changes: 4 additions & 70 deletions voice/src/main/kotlin/FrameInterceptor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,88 +2,22 @@ package dev.kord.voice

import dev.kord.common.annotation.KordVoice
import dev.kord.gateway.Gateway
import dev.kord.voice.gateway.SendSpeaking
import dev.kord.voice.gateway.VoiceGateway
import kotlin.properties.Delegates
import kotlinx.coroutines.flow.Flow

/**
* Variables that are accessible to any FrameInterceptor through the [VoiceConnection.frameInterceptorFactory].
*
* @param gateway the gateway that handles the guild this voice connection is connected to.
* @param voiceGateway the underlying [VoiceGateway].
* @param ssrc the current SSRC retrieved from Discord.
*/
@KordVoice
public data class FrameInterceptorContext(
public data class FrameInterceptorConfiguration(
val gateway: Gateway,
val voiceGateway: VoiceGateway,
val ssrc: UInt,
val ssrc: UInt
)

@KordVoice
public class FrameInterceptorContextBuilder(public var gateway: Gateway, public var voiceGateway: VoiceGateway) {
public var ssrc: UInt by Delegates.notNull()

public fun build(): FrameInterceptorContext = FrameInterceptorContext(gateway, voiceGateway, ssrc)
}

@KordVoice
internal inline fun FrameInterceptorContext(gateway: Gateway, voiceGateway: VoiceGateway, builder: FrameInterceptorContextBuilder.() -> Unit) =
FrameInterceptorContextBuilder(gateway, voiceGateway).apply(builder).build()

/**
* An interceptor for audio frames before they are sent as packets.
*
* @see DefaultFrameInterceptor
*/
@KordVoice
public fun interface FrameInterceptor {
public suspend fun intercept(frame: AudioFrame?): AudioFrame?
}

private const val FRAMES_OF_SILENCE_TO_PLAY = 5

/**
* The default implementation for [FrameInterceptor].
* Any custom implementation should extend this and call the super [intercept] method, or else
* the speaking flags will not be sent!
*
* @param context the context for this interceptor.
* @param speakingState the speaking state that will be used when there is audio data to be sent. By default, it is microphone-only.
*/
@KordVoice
public open class DefaultFrameInterceptor(
protected val context: FrameInterceptorContext,
private val speakingState: SpeakingFlags = SpeakingFlags { +SpeakingFlag.Microphone }
) : FrameInterceptor {
private val voiceGateway = context.voiceGateway

private var framesOfSilence = 5
private var isSpeaking = false

private val nowSpeaking = SendSpeaking(speakingState, 0, context.ssrc)
private val notSpeaking = SendSpeaking(SpeakingFlags(0), 0, context.ssrc)

override suspend fun intercept(frame: AudioFrame?): AudioFrame? {
if (frame != null || framesOfSilence > 0) { // is there something to process
if (!isSpeaking && frame != null) { // if there is audio make sure we are speaking
isSpeaking = true
voiceGateway.send(nowSpeaking)
}

if (frame == null) { // if we don't have audio then make sure we know that we are sending a frame of silence
if (--framesOfSilence == 0) { // we're done with frames of silence if we hit zero
isSpeaking = false
voiceGateway.send(notSpeaking)
}
}
else if (framesOfSilence != FRAMES_OF_SILENCE_TO_PLAY) {
framesOfSilence = FRAMES_OF_SILENCE_TO_PLAY // we're playing audio, lets reset the frames of silence.
}

return frame ?: AudioFrame.SILENCE
}

return frame
}
public fun Flow<AudioFrame?>.intercept(configuration: FrameInterceptorConfiguration): Flow<AudioFrame?>
}
4 changes: 2 additions & 2 deletions voice/src/main/kotlin/VoiceConnection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public data class VoiceConnectionData(
* @param data the data representing this [VoiceConnection].
* @param voiceGatewayConfiguration the configuration used on each new [connect] for the [voiceGateway].
* @param audioProvider a [AudioProvider] that will provide [AudioFrame] when required.
* @param frameInterceptor a [FrameInterceptor] that will intercept all outgoing [AudioFrame]s.
* @param frameSender the [AudioFrameSender] that will handle the sending of audio packets.
* @param nonceStrategy the [NonceStrategy] that is used during encryption of audio.
* @param frameInterceptorFactory a factory for [FrameInterceptor]s that is used whenever audio is ready to be sent. See [FrameInterceptor] and [DefaultFrameInterceptor].
*/
@KordVoice
public class VoiceConnection(
Expand All @@ -52,9 +52,9 @@ public class VoiceConnection(
public var voiceGatewayConfiguration: VoiceGatewayConfiguration,
public val streams: Streams,
public val audioProvider: AudioProvider,
public val frameInterceptor: FrameInterceptor,
public val frameSender: AudioFrameSender,
public val nonceStrategy: NonceStrategy,
public val frameInterceptorFactory: (FrameInterceptorContext) -> FrameInterceptor,
) {
public val scope: CoroutineScope =
CoroutineScope(SupervisorJob() + CoroutineName("kord-voice-connection[${data.guildId.value}]"))
Expand Down
46 changes: 26 additions & 20 deletions voice/src/main/kotlin/VoiceConnectionBuilder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ public class VoiceConnectionBuilder(
*/
public var audioProvider: AudioProvider? = null

public fun audioProvider(provider: AudioProvider) {
this.audioProvider = provider
}

/**
* The [FrameInterceptor] for this [VoiceConnection].
* If `null`, [DefaultFrameInterceptor] will be used.
*/
public var frameInterceptor: FrameInterceptor? = null

public fun frameInterceptor(frameInterceptor: FrameInterceptor) {
this.frameInterceptor = frameInterceptor
}

/**
* The [dev.kord.voice.udp.AudioFrameSender] for this [VoiceConnection]. If null, [dev.kord.voice.udp.DefaultAudioFrameSender]
* will be used.
Expand All @@ -54,21 +68,6 @@ public class VoiceConnectionBuilder(
*/
public var nonceStrategy: NonceStrategy? = null

public fun audioProvider(provider: AudioProvider) {
this.audioProvider = provider
}

/**
* The [FrameInterceptor] factory for this [VoiceConnection].
* When one is not set, a factory will be used to create the default interceptor, see [DefaultFrameInterceptor].
* This factory will be used to create a new [FrameInterceptor] whenever audio is ready to be sent.
*/
public var frameInterceptorFactory: ((FrameInterceptorContext) -> FrameInterceptor)? = null

public fun frameInterceptor(factory: (FrameInterceptorContext) -> FrameInterceptor) {
this.frameInterceptorFactory = factory
}

/**
* A boolean indicating whether your voice state will be muted.
*/
Expand Down Expand Up @@ -160,10 +159,17 @@ public class VoiceConnectionBuilder(
.build()
val udpSocket = udpSocket ?: GlobalVoiceUdpSocket
val audioProvider = audioProvider ?: EmptyAudioPlayerProvider
val audioSender =
audioSender ?: DefaultAudioFrameSender(DefaultAudioFrameSenderData(udpSocket))
val nonceStrategy = nonceStrategy ?: LiteNonceStrategy()
val frameInterceptorFactory = frameInterceptorFactory ?: { DefaultFrameInterceptor(it) }
val frameInterceptor = frameInterceptor ?: DefaultFrameInterceptor()
val audioSender =
audioSender ?: DefaultAudioFrameSender(
DefaultAudioFrameSenderData(
udpSocket,
frameInterceptor,
audioProvider,
nonceStrategy
)
)
val streams =
streams ?: if (receiveVoice) DefaultStreams(voiceGateway, udpSocket, nonceStrategy) else NOPStreams

Expand All @@ -175,9 +181,9 @@ public class VoiceConnectionBuilder(
initialGatewayConfiguration,
streams,
audioProvider,
frameInterceptor,
audioSender,
nonceStrategy,
frameInterceptorFactory,
nonceStrategy
)
}

Expand Down
9 changes: 3 additions & 6 deletions voice/src/main/kotlin/handlers/UdpLifeCycleHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package dev.kord.voice.handlers

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.EncryptionMode
import dev.kord.voice.FrameInterceptorContextBuilder
import dev.kord.voice.FrameInterceptorConfiguration
import dev.kord.voice.VoiceConnection
import dev.kord.voice.encryption.strategies.LiteNonceStrategy
import dev.kord.voice.encryption.strategies.NormalNonceStrategy
Expand Down Expand Up @@ -64,11 +64,8 @@ internal class UdpLifeCycleHandler(
val config = AudioFrameSenderConfiguration(
ssrc = ssrc!!,
key = it.secretKey.toUByteArray().toByteArray(),
nonceStrategy = nonceStrategy,
provider = audioProvider,
baseFrameInterceptorContext = FrameInterceptorContextBuilder(gateway, voiceGateway),
interceptorFactory = frameInterceptorFactory,
server = server!!
server = server!!,
interceptorConfiguration = FrameInterceptorConfiguration(gateway, voiceGateway, ssrc!!)
)

audioSenderJob?.cancel()
Expand Down
11 changes: 2 additions & 9 deletions voice/src/main/kotlin/udp/AudioFrameSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@
package dev.kord.voice.udp

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.AudioProvider
import dev.kord.voice.FrameInterceptor
import dev.kord.voice.FrameInterceptorContext
import dev.kord.voice.FrameInterceptorContextBuilder
import dev.kord.voice.encryption.strategies.NonceStrategy
import dev.kord.voice.FrameInterceptorConfiguration
import io.ktor.util.network.*

@KordVoice
public data class AudioFrameSenderConfiguration(
val server: NetworkAddress,
val ssrc: UInt,
val key: ByteArray,
val nonceStrategy: NonceStrategy,
val provider: AudioProvider,
val baseFrameInterceptorContext: FrameInterceptorContextBuilder,
val interceptorFactory: (FrameInterceptorContext) -> FrameInterceptor
val interceptorConfiguration: FrameInterceptorConfiguration
)

@KordVoice
Expand Down
36 changes: 18 additions & 18 deletions voice/src/main/kotlin/udp/DefaultAudioFrameSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package dev.kord.voice.udp

import dev.kord.common.annotation.KordVoice
import dev.kord.voice.AudioFrame
import dev.kord.voice.AudioProvider
import dev.kord.voice.FrameInterceptor
import dev.kord.voice.encryption.strategies.NonceStrategy
import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import mu.KotlinLogging
import kotlin.random.Random
Expand All @@ -15,39 +18,36 @@ private val audioFrameSenderLogger = KotlinLogging.logger { }

@KordVoice
public data class DefaultAudioFrameSenderData(
val udp: VoiceUdpSocket
val udp: VoiceUdpSocket,
val interceptor: FrameInterceptor,
val provider: AudioProvider,
val nonceStrategy: NonceStrategy,
)

@KordVoice
public class DefaultAudioFrameSender(
public val data: DefaultAudioFrameSenderData
) : AudioFrameSender {
private fun createFrameInterceptor(configuration: AudioFrameSenderConfiguration): FrameInterceptor =
with(configuration) {
val builder = baseFrameInterceptorContext
builder.ssrc = ssrc
return interceptorFactory(builder.build()) // we should assume that everything else is set before-hand in the base builder
}

override suspend fun start(configuration: AudioFrameSenderConfiguration): Unit = coroutineScope {
val interceptor: FrameInterceptor = createFrameInterceptor(configuration)
var sequence: UShort = Random.nextBits(UShort.SIZE_BITS).toUShort()

val packetProvider = DefaultAudioPackerProvider(configuration.key, configuration.nonceStrategy)
val packetProvider = DefaultAudioPackerProvider(configuration.key, data.nonceStrategy)

val frames = Channel<AudioFrame?>(Channel.RENDEZVOUS)
with(configuration.provider) { launch { provideFrames(frames) } }
with(data.provider) { launch { provideFrames(frames) } }

audioFrameSenderLogger.trace { "audio poller starting." }

try {
for (frame in frames) {
val consumedFrame = interceptor.intercept(frame) ?: continue
val packet = packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, consumedFrame.data)

data.udp.send(Datagram(ByteReadPacket(packet.data, packet.dataStart, packet.viewSize), configuration.server))

sequence++
with(data.interceptor) {
frames.consumeAsFlow()
.intercept(configuration.interceptorConfiguration)
.filterNotNull()
.map { packetProvider.provide(sequence, sequence * 960u, configuration.ssrc, it.data) }
.map { Datagram(ByteReadPacket(it.data, it.dataStart, it.viewSize), configuration.server) }
.onEach(data.udp::send)
.onEach { sequence++ }
.collect()
}
} catch (e: Exception) {
audioFrameSenderLogger.trace(e) { "poller stopped with reason" }
Expand Down