Skip to content

Commit

Permalink
Merge pull request #9 from Weava/kotlin-coroutines-adapter
Browse files Browse the repository at this point in the history
Add support for Kotlin coroutines
  • Loading branch information
zhxnlai authored Jun 26, 2018
2 parents 6a790d9 + 5f9fc5c commit 5c74eff
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ implementation 'com.github.tinder.scarlet:scarlet:0.1.2'
`StreamAdapter.Factory`
- [x] `RxJava2`
- [x] `RxJava1`
- [ ] `Kotlin Coroutine`
- [x] `Kotlin Coroutine`

`Lifecycle`
- [x] `AndroidLifecycle`
Expand Down
2 changes: 2 additions & 0 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ ext {
rxAndroid = 'io.reactivex.rxjava2:rxandroid:2.0.1'
rxKotlin = 'io.reactivex.rxjava2:rxkotlin:2.1.0'
rxJava1 = 'io.reactivex:rxjava:1.3.4'
kotlinCoroutines = 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.23.3'
kotlinCoroutinesRxInterop = 'org.jetbrains.kotlinx:kotlinx-coroutines-reactive:0.23.3'

stetho = 'com.facebook.stetho:stetho:1.5.0'
stethoOkHttp = 'com.facebook.stetho:stetho-okhttp3:1.5.0'
Expand Down
76 changes: 76 additions & 0 deletions scarlet-stream-adapter-coroutines/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
apply plugin: 'kotlin'
apply plugin: 'java-library'
apply plugin: 'org.jetbrains.dokka'
apply plugin: 'maven-publish'

dependencies {
api rootProject.ext.kotlinCoroutines
api rootProject.ext.kotlinCoroutinesRxInterop

implementation project(':scarlet-core')
implementation rootProject.ext.kotlinStdlib

testImplementation project(':scarlet')
testImplementation project(':scarlet-websocket-mockwebserver')
testImplementation project(':scarlet-test-utils')
testImplementation rootProject.ext.junit
testImplementation rootProject.ext.mockito
testImplementation rootProject.ext.kotlinReflect
testImplementation rootProject.ext.assertJ
}

kotlin { experimental { coroutines 'enable' } }

dokka {
outputFormat = 'javadoc'
outputDirectory = "$buildDir/javadoc"
}

task sourcesJar(type: Jar, dependsOn: classes) {
classifier = 'sources'
from sourceSets.main.allSource
}

task dokkaJavadoc(type: org.jetbrains.dokka.gradle.DokkaTask) {
outputFormat = 'javadoc'
outputDirectory = javadoc.destinationDir
}

task javadocJar(type: Jar, dependsOn: dokkaJavadoc) {
classifier = 'javadoc'
from javadoc.destinationDir
}

artifacts {
archives sourcesJar, javadocJar
}

publishing {
publications {
mavenJava(MavenPublication) {
groupId 'com.tinder'
version version
artifactId project.getName()
artifact sourcesJar
artifact javadocJar
from components.java
}
}
}

artifactory {
contextUrl = 'https://tinder.jfrog.io/tinder'
publish {
repository {
repoKey = 'libs-release-local'
username = System.getenv("ARTIFACTORY_USER")
password = System.getenv("ARTIFACTORY_PASSWORD")
maven = true
}
defaults {
publications('mavenJava')
publishArtifacts = true
publishPom = true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.StreamAdapter
import com.tinder.scarlet.utils.getRawType
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import java.lang.reflect.Type

/**
* A [stream adapter factory][StreamAdapter.Factory] that uses ReceiveChannel.
*/
class CoroutinesStreamAdapterFactory : StreamAdapter.Factory {

override fun create(type: Type): StreamAdapter<Any, Any> {
return when (type.getRawType()) {
ReceiveChannel::class.java -> ReceiveChannelStreamAdapter()
else -> throw IllegalArgumentException()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.streamadapter.coroutines

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.reactive.openSubscription

class ReceiveChannelStreamAdapter<T> : StreamAdapter<T, ReceiveChannel<T>> {

override fun adapt(stream: Stream<T>) = stream.openSubscription()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* © 2013 - 2018 Tinder, Inc., ALL RIGHTS RESERVED
*/

package com.tinder.scarlet.streamadapter.coroutines

import com.tinder.scarlet.Lifecycle
import com.tinder.scarlet.Scarlet
import com.tinder.scarlet.Stream
import com.tinder.scarlet.WebSocket
import com.tinder.scarlet.lifecycle.LifecycleRegistry
import com.tinder.scarlet.testutils.TestStreamObserver
import com.tinder.scarlet.testutils.any
import com.tinder.scarlet.testutils.test
import com.tinder.scarlet.testutils.containingText
import com.tinder.scarlet.testutils.containingBytes
import com.tinder.scarlet.websocket.mockwebserver.newWebSocketFactory
import com.tinder.scarlet.websocket.okhttp.newWebSocketFactory
import com.tinder.scarlet.ws.Receive
import com.tinder.scarlet.ws.Send
import com.tinder.streamadapter.coroutines.CoroutinesStreamAdapterFactory
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import okhttp3.OkHttpClient
import okhttp3.mockwebserver.MockWebServer
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit

class ReceiveChannelTest {

@get:Rule
private val mockWebServer = MockWebServer()
private val serverUrlString by lazy { mockWebServer.url("/").toString() }

private val serverLifecycleRegistry = LifecycleRegistry()
private lateinit var server: Service
private lateinit var serverEventObserver: TestStreamObserver<WebSocket.Event>

private val clientLifecycleRegistry = LifecycleRegistry()
private lateinit var client: Service
private lateinit var clientEventObserver: TestStreamObserver<WebSocket.Event>

@Test
fun send_givenConnectionIsEstablished_shouldBeReceivedByTheServer() {
// Given
givenConnectionIsEstablished()
val textMessage1 = "Hello"
val textMessage2 = "Hi"
val bytesMessage1 = "Yo".toByteArray()
val bytesMessage2 = "Sup".toByteArray()
val testTextChannel = server.observeText()
val testBytesChannel = server.observeBytes()

// When
client.sendText(textMessage1)
val isSendTextSuccessful = client.sendTextAndConfirm(textMessage2)
client.sendBytes(bytesMessage1)
val isSendBytesSuccessful = client.sendBytesAndConfirm(bytesMessage2)

// Then
assertThat(isSendTextSuccessful).isTrue()
assertThat(isSendBytesSuccessful).isTrue()

serverEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>(),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage1),
any<WebSocket.Event.OnMessageReceived>().containingText(textMessage2),
any<WebSocket.Event.OnMessageReceived>().containingBytes(bytesMessage1),
any<WebSocket.Event.OnMessageReceived>().containingBytes(bytesMessage2)
)

runBlocking {
assertThat(testTextChannel.receiveOrNull()).isEqualTo(textMessage1)
assertThat(testTextChannel.receiveOrNull()).isEqualTo(textMessage2)

assertThat(testBytesChannel.receiveOrNull()).isEqualTo(bytesMessage1)
assertThat(testBytesChannel.receiveOrNull()).isEqualTo(bytesMessage2)
}
}

private fun givenConnectionIsEstablished() {
createClientAndServer()
serverLifecycleRegistry.onNext(Lifecycle.State.Started)
clientLifecycleRegistry.onNext(Lifecycle.State.Started)
blockUntilConnectionIsEstablish()
}

private fun createClientAndServer() {
server = createServer()
serverEventObserver = server.observeEvents().test()
client = createClient()
clientEventObserver = client.observeEvents().test()
}

private fun createServer(): Service {
val webSocketFactory = mockWebServer.newWebSocketFactory()
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(serverLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}

private fun createClient(): Service {
val okHttpClient = OkHttpClient.Builder()
.writeTimeout(500, TimeUnit.MILLISECONDS)
.readTimeout(500, TimeUnit.MILLISECONDS)
.build()
val webSocketFactory = okHttpClient.newWebSocketFactory(serverUrlString)
val scarlet = Scarlet.Builder()
.webSocketFactory(webSocketFactory)
.lifecycle(clientLifecycleRegistry)
.addStreamAdapterFactory(CoroutinesStreamAdapterFactory())
.build()
return scarlet.create()
}

private fun blockUntilConnectionIsEstablish() {
clientEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
serverEventObserver.awaitValues(
any<WebSocket.Event.OnConnectionOpened<*>>()
)
}

private interface Service {
@Receive
fun observeEvents(): Stream<WebSocket.Event>

@Receive
fun observeText(): ReceiveChannel<String>

@Receive
fun observeBytes(): ReceiveChannel<ByteArray>

@Send
fun sendText(message: String)

@Send
fun sendTextAndConfirm(message: String): Boolean

@Send
fun sendBytes(message: ByteArray)

@Send
fun sendBytesAndConfirm(message: ByteArray): Boolean
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ include ':scarlet-message-adapter-protobuf'
include ':scarlet-message-adapter-gson'
include ':scarlet-stream-adapter-rxjava'
include ':scarlet-stream-adapter-rxjava2'
include ':scarlet-stream-adapter-coroutines'
include ':scarlet-lifecycle-android'
include ':demo'

0 comments on commit 5c74eff

Please sign in to comment.