From 65d9a423f81ad10b0f55b4ebc91988c974959b2b Mon Sep 17 00:00:00 2001 From: Idan Sheinberg Date: Sun, 20 Sep 2020 16:09:05 +0300 Subject: [PATCH] kotlin coroutines test finished --- knitter/build.gradle | 4 +- .../knitter/coroutines/PinnedDispatchers.kt | 53 ++++++------- .../coroutines/PinnedDispatchersTest.kt | 79 +++++++++++++++++++ 3 files changed, 106 insertions(+), 30 deletions(-) create mode 100644 knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt diff --git a/knitter/build.gradle b/knitter/build.gradle index e074e2f..e1dd152 100644 --- a/knitter/build.gradle +++ b/knitter/build.gradle @@ -5,10 +5,12 @@ plugins { dependencies { api project(':needle-core') implementation project(':needle-concurrent') - testImplementation project(':needle-core').sourceSets.test.output implementation "org.jetbrains.kotlin:kotlin-stdlib" compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' + + testImplementation project(':needle-core').sourceSets.test.output + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9' } task sourcesJar(type: Jar, dependsOn: classes) { diff --git a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt index cedb532..6f81845 100644 --- a/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt +++ b/knitter/src/main/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchers.kt @@ -1,7 +1,6 @@ package org.sheinbergon.needle.knitter.coroutines import kotlinx.coroutines.CoroutineDispatcher -import kotlinx.coroutines.Runnable import kotlinx.coroutines.asCoroutineDispatcher import org.sheinbergon.needle.AffinityDescriptor import org.sheinbergon.needle.concurrent.FixedAffinityPinnedThreadFactory @@ -9,42 +8,38 @@ import org.sheinbergon.needle.concurrent.GovernedAffinityPinnedThreadFactory import org.sheinbergon.needle.concurrent.PinnedThreadPoolExecutor import kotlin.coroutines.CoroutineContext -object PinnedDispatchers { +private const val `1` = 1 - private const val `1` = 1 +private class GovernedAffinityDelegatingDispatcher( + parallelism: Int, + affinity: AffinityDescriptor +) : GovernedAffinityDispatcher() { - private class GovernedAffinityDelegatingDispatcher( - parallelism: Int, - affinity: AffinityDescriptor - ) : GovernedAffinityDispatcher() { + val factory: GovernedAffinityPinnedThreadFactory = GovernedAffinityPinnedThreadFactory(affinity) - val factory: GovernedAffinityPinnedThreadFactory + private val delegate: CoroutineDispatcher - val delegate: CoroutineDispatcher - - init { - factory = GovernedAffinityPinnedThreadFactory(affinity) - val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory) - delegate = executor.asCoroutineDispatcher() - } + init { + val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory) + delegate = executor.asCoroutineDispatcher() + } - override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true) + override fun alter(affinity: AffinityDescriptor) = factory.alter(affinity, true) - override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block) - } + override fun dispatch(context: CoroutineContext, block: Runnable) = delegate.dispatch(context, block) +} - fun governedAffinitySingleThread(affinity: AffinityDescriptor): GovernedAffinityDispatcher = - governedAffinityThreadPool(`1`, affinity) +fun governedAffinitySingleThread(affinity: AffinityDescriptor): GovernedAffinityDispatcher = + governedAffinityThreadPool(`1`, affinity) - fun governedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): GovernedAffinityDispatcher = - GovernedAffinityDelegatingDispatcher(parallelism, affinity) +fun governedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): GovernedAffinityDispatcher = + GovernedAffinityDelegatingDispatcher(parallelism, affinity) - fun fixedAffinitySingleThread(affinity: AffinityDescriptor) = - fixedAffinityThreadPool(`1`, affinity) +fun fixedAffinitySingleThread(affinity: AffinityDescriptor) = + fixedAffinityThreadPool(`1`, affinity) - fun fixedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): CoroutineDispatcher { - val factory = FixedAffinityPinnedThreadFactory(affinity) - val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory) - return executor.asCoroutineDispatcher() - } +fun fixedAffinityThreadPool(parallelism: Int, affinity: AffinityDescriptor): CoroutineDispatcher { + val factory = FixedAffinityPinnedThreadFactory(affinity) + val executor = PinnedThreadPoolExecutor.newFixedPinnedThreadPool(parallelism, factory) + return executor.asCoroutineDispatcher() } diff --git a/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt new file mode 100644 index 0000000..591a950 --- /dev/null +++ b/knitter/src/test/kotlin/org/sheinbergon/needle/knitter/coroutines/PinnedDispatchersTest.kt @@ -0,0 +1,79 @@ +package org.sheinbergon.needle.knitter.coroutines + +import kotlinx.coroutines.* +import org.amshove.kluent.shouldBeEqualTo +import org.amshove.kluent.shouldBeLessOrEqualTo +import org.junit.jupiter.api.Test +import org.sheinbergon.needle.* + +class PinnedDispatchersTest { + + @Test + fun `Fixed affinity single threaded dispatcher`() { + val dispatcher = fixedAffinitySingleThread(testAffinityDescriptor) + val deferred = deferredAffinitySingleAsync(dispatcher) + runBlocking { blockingAssertSingle(deferred, binaryTestMask, textTestMask) } + } + + @Test + fun `Fixed affinity thread-pool dispatcher`() { + val dispatcher = fixedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor) + val deferred = deferredAffinityPoolAsync(availableCores, dispatcher) + runBlocking { blockingAssertPool(availableCores, deferred, negatedBinaryTestMask, negatedTextTestMask) } + } + + @Test + fun `Governed affinity single threaded dispatcher`() { + val dispatcher = governedAffinitySingleThread(testAffinityDescriptor) + val deferred1 = deferredAffinitySingleAsync(dispatcher) + runBlocking { blockingAssertSingle(deferred1, binaryTestMask, textTestMask) } + dispatcher.alter(negatedTestAffinityDescriptor) + val deferred2 = deferredAffinitySingleAsync(dispatcher) + runBlocking { blockingAssertSingle(deferred2, negatedBinaryTestMask, negatedTextTestMask) } + } + + @Test + fun `Governed affinity thread-pool dispatcher`() { + val dispatcher = governedAffinityThreadPool(availableCores, negatedTestAffinityDescriptor) + val deferred1 = deferredAffinityPoolAsync(availableCores, dispatcher) + runBlocking { blockingAssertPool(availableCores, deferred1, negatedBinaryTestMask, negatedTextTestMask) } + dispatcher.alter(testAffinityDescriptor) + val deferred2 = deferredAffinityPoolAsync(availableCores, dispatcher) + runBlocking { blockingAssertPool(availableCores, deferred2, binaryTestMask, textTestMask) } + } + + private fun deferredAffinitySingleAsync(dispatcher: CoroutineDispatcher) = + GlobalScope.async(dispatcher) { Needle.affinity() } + + private fun deferredAffinityPoolAsync(cores: Int, dispatcher: CoroutineDispatcher) = (`1`..cores) + .map { + GlobalScope.async(dispatcher) { + Thread.currentThread() to Needle.affinity() + } + } + + private suspend fun blockingAssertSingle( + deferred: Deferred, + binaryMask: Long, + textMask: String + ) { + val affinity = deferred.await() + affinity.mask() shouldBeEqualTo binaryMask + affinity.toString() shouldBeEqualTo textMask + } + + private suspend fun blockingAssertPool( + cores: Int, + deferred: List>>, + binaryMask: Long, + textMask: String + ) { + val results = deferred.awaitAll() + val threads = results.mapTo(mutableSetOf(), Pair::first) + threads.size shouldBeLessOrEqualTo cores + results.forEach { (_, affinity) -> + affinity.mask() shouldBeEqualTo binaryMask + affinity.toString() shouldBeEqualTo textMask + } + } +}