Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/real time flags (#5) #21

6 changes: 6 additions & 0 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ jobs:
name: Run Unit Tests
runs-on: ubuntu-latest

env:
INTEGRATION_TESTS_ENVIRONMENT_KEY: NTtWcerSBE5yj7a5optMSk
INTEGRATION_TESTS_FEATURE_NAME: integration-test-feature
INTEGRATION_TESTS_FEATURE_STATE_ID: 321715
INTEGRATION_TESTS_API_TOKEN: ${{ secrets.INTEGRATION_TESTS_API_TOKEN }}

steps:
- uses: actions/checkout@v3
- name: Preconfigure gradle
Expand Down
14 changes: 14 additions & 0 deletions FlagsmithClient/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,16 @@ android {

dependencies {
implementation("com.google.code.gson:gson:2.10")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")

// HTTP Client
implementation("com.squareup.retrofit2:retrofit:2.9.0")
implementation("com.squareup.retrofit2:converter-gson:2.9.0")

// Server Sent Events
implementation("com.squareup.okhttp3:okhttp-sse:4.11.0")
testImplementation("com.squareup.okhttp3:okhttp-sse:4.11.0")

testImplementation("junit:junit:4.13.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4")
testImplementation("org.mock-server:mockserver-netty-no-dependencies:5.14.0")
Expand Down Expand Up @@ -101,6 +106,15 @@ kover {
}

tasks.withType(Test::class) {
// if the excludeIntegrationTests property is set
// then exclude tests with IntegrationTest in the name
// i.e. `gradle :FlagsmithClient:testDebugUnitTest --tests "com.flagsmith.*" -P excludeIntegrationTests`
if (project.hasProperty("excludeIntegrationTests")) {
exclude {
it.name.contains("IntegrationTest")
}
}

testLogging {
events(
TestLogEvent.FAILED,
Expand Down
94 changes: 86 additions & 8 deletions FlagsmithClient/src/main/java/com/flagsmith/Flagsmith.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package com.flagsmith

import android.content.Context
import com.flagsmith.entities.*
import android.util.Log
import com.flagsmith.entities.Flag
import com.flagsmith.entities.Identity
import com.flagsmith.entities.IdentityFlagsAndTraits
import com.flagsmith.entities.Trait
import com.flagsmith.entities.TraitWithIdentity
import com.flagsmith.internal.FlagsmithAnalytics
import com.flagsmith.internal.FlagsmithEventService
import com.flagsmith.internal.FlagsmithEventTimeTracker
import com.flagsmith.internal.FlagsmithRetrofitService
import com.flagsmith.internal.enqueueWithResult
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.onEach
import okhttp3.Cache

/**
* Flagsmith
Expand All @@ -21,27 +32,75 @@ import com.flagsmith.internal.enqueueWithResult
class Flagsmith constructor(
private val environmentKey: String,
private val baseUrl: String = "https://edge.api.flagsmith.com/api/v1/",
private val eventSourceUrl: String? = null,
private val context: Context? = null,
private val enableAnalytics: Boolean = DEFAULT_ENABLE_ANALYTICS,
private val enableRealtimeUpdates: Boolean = false,
private val analyticsFlushPeriod: Int = DEFAULT_ANALYTICS_FLUSH_PERIOD_SECONDS,
private val cacheConfig: FlagsmithCacheConfig = FlagsmithCacheConfig(),
private val defaultFlags: List<Flag> = emptyList(),
private val requestTimeoutSeconds: Long = 4L,
private val readTimeoutSeconds: Long = 6L,
private val writeTimeoutSeconds: Long = 6L
) {
private val retrofit: FlagsmithRetrofitService = FlagsmithRetrofitService.create(
baseUrl = baseUrl, environmentKey = environmentKey, context = context, cacheConfig = cacheConfig,
requestTimeoutSeconds = requestTimeoutSeconds, readTimeoutSeconds = readTimeoutSeconds, writeTimeoutSeconds = writeTimeoutSeconds)
private val writeTimeoutSeconds: Long = 6L,
override var lastSeenAt: Double = 0.0 // from FlagsmithEventTimeTracker
) : FlagsmithEventTimeTracker {
private lateinit var retrofit: FlagsmithRetrofitService
private var cache: Cache? = null
private var lastUsedIdentity: String? = null
private val analytics: FlagsmithAnalytics? =
if (!enableAnalytics) null
else if (context != null) FlagsmithAnalytics(context, retrofit, analyticsFlushPeriod)
else throw IllegalArgumentException("Flagsmith requires a context to use the analytics feature")

private val eventService: FlagsmithEventService? =
if (!enableRealtimeUpdates) null
else FlagsmithEventService(eventSourceUrl = eventSourceUrl, environmentKey = environmentKey) { event ->
if (event.isSuccess) {
lastEventUpdate = event.getOrNull()?.updatedAt ?: lastEventUpdate

// Check whether this event is anything new
if (lastEventUpdate > lastSeenAt) {
// First evict the cache otherwise we'll be stuck with the old values
cache?.evictAll()
lastSeenAt = lastEventUpdate

// Now we can get the new values
getFeatureFlags(lastUsedIdentity) { res ->
if (res.isFailure) {
Log.e(
"Flagsmith",
"Error getting flags in SSE stream: ${res.exceptionOrNull()}"
)
} else {
Log.i("Flagsmith", "Got flags due to SSE event: $event")

// If the customer wants to subscribe to updates, emit the new flags
flagUpdateFlow.tryEmit(res.getOrNull() ?: emptyList())
}
}
}
}
}

// The last time we got an event from the SSE stream or via the API
private var lastEventUpdate: Double = 0.0

/// Stream of flag updates from the SSE stream if enabled
val flagUpdateFlow = MutableStateFlow<List<Flag>>(listOf())

init {
if (cacheConfig.enableCache && context == null) {
throw IllegalArgumentException("Flagsmith requires a context to use the cache feature")
}
if (enableRealtimeUpdates) {
getFlagUpdates()
}
val pair = FlagsmithRetrofitService.create(
baseUrl = baseUrl, environmentKey = environmentKey, context = context, cacheConfig = cacheConfig,
requestTimeoutSeconds = requestTimeoutSeconds, readTimeoutSeconds = readTimeoutSeconds,
writeTimeoutSeconds = writeTimeoutSeconds, timeTracker = this)
retrofit = pair.first
cache = pair.second
}

companion object {
Expand All @@ -50,8 +109,12 @@ class Flagsmith constructor(
}

fun getFeatureFlags(identity: String? = null, result: (Result<List<Flag>>) -> Unit) {
// Save the last used identity as we'll refresh with this if we get update events
lastUsedIdentity = identity

if (identity != null) {
retrofit.getIdentityFlagsAndTraits(identity).enqueueWithResult { res ->
flagUpdateFlow.tryEmit(res.getOrNull()?.flags ?: emptyList())
result(res.map { it.flags })
}
} else {
Expand All @@ -78,18 +141,19 @@ class Flagsmith constructor(
fun getTrait(id: String, identity: String, result: (Result<Trait?>) -> Unit) =
retrofit.getIdentityFlagsAndTraits(identity).enqueueWithResult { res ->
result(res.map { value -> value.traits.find { it.key == id } })
}
}.also { lastUsedIdentity = identity }

fun getTraits(identity: String, result: (Result<List<Trait>>) -> Unit) =
retrofit.getIdentityFlagsAndTraits(identity).enqueueWithResult { res ->
result(res.map { it.traits })
}
}.also { lastUsedIdentity = identity }

fun setTrait(trait: Trait, identity: String, result: (Result<TraitWithIdentity>) -> Unit) =
retrofit.postTraits(TraitWithIdentity(trait.key, trait.value, Identity(identity))).enqueueWithResult(result = result)

fun getIdentity(identity: String, result: (Result<IdentityFlagsAndTraits>) -> Unit) =
retrofit.getIdentityFlagsAndTraits(identity).enqueueWithResult(defaults = null, result = result)
.also { lastUsedIdentity = identity }

private fun getFeatureFlag(
featureId: String,
Expand All @@ -101,6 +165,20 @@ class Flagsmith constructor(
analytics?.trackEvent(featureId)
foundFlag
})
}.also { lastUsedIdentity = identity }

private fun getFlagUpdates() {
if (eventService == null) return
eventService.sseEventsFlow.onEach {
getFeatureFlags { res ->
if (res.isFailure) {
Log.e("Flagsmith", "Error getting flags in SSE stream: ${res.exceptionOrNull()}")
return@getFeatureFlags
}
}
}.catch {
Log.e("Flagsmith", "Error in SSE stream: $it")
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flagsmith.entities

internal data class FeatureStatePutBody (
val enabled: Boolean,
val feature_state_value: Any?
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.flagsmith.entities

import com.google.gson.annotations.SerializedName

internal data class FlagEvent (
@SerializedName(value = "updated_at") val updatedAt: Double
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.flagsmith.entities


import com.google.gson.annotations.SerializedName
import java.io.Reader

data class Trait(
val identifier: String? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import android.util.Log
import org.json.JSONException
import org.json.JSONObject

class FlagsmithAnalytics constructor(
internal class FlagsmithAnalytics constructor(
private val context: Context,
private val retrofitService: FlagsmithRetrofitService,
private val flushPeriod: Int
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.flagsmith.internal

import android.util.Log
import com.flagsmith.entities.FlagEvent
import com.google.gson.Gson
import kotlinx.coroutines.flow.MutableStateFlow
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.sse.EventSource
import okhttp3.sse.EventSourceListener
import okhttp3.sse.EventSources
import java.util.concurrent.TimeUnit

internal class FlagsmithEventService constructor(
private val eventSourceUrl: String?,
private val environmentKey: String,
private val updates: (Result<FlagEvent>) -> Unit
) {
private val defaultEventSourceHost = "https://realtime.flagsmith.com/"

private val sseClient = OkHttpClient.Builder()
.addInterceptor(FlagsmithRetrofitService.envKeyInterceptor(environmentKey))
.connectTimeout(6, TimeUnit.SECONDS)
.readTimeout(10, TimeUnit.MINUTES)
.writeTimeout(10, TimeUnit.MINUTES)
.build()

private val defaultEventSourceUrl: String = defaultEventSourceHost + "sse/environments/" + environmentKey + "/stream"

private val sseRequest = Request.Builder()
.url(eventSourceUrl ?: defaultEventSourceUrl)
.header("Accept", "application/json")
.addHeader("Accept", "text/event-stream")
.build()

private var currentEventSource: EventSource? = null

var sseEventsFlow = MutableStateFlow(FlagEvent(updatedAt = 0.0))
private set

private val sseEventSourceListener = object : EventSourceListener() {
override fun onClosed(eventSource: EventSource) {
super.onClosed(eventSource)
Log.d(TAG, "onClosed: $eventSource")

// This isn't uncommon and is the nature of HTTP requests, so just reconnect
initEventSource()
}

override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
super.onEvent(eventSource, id, type, data)
Log.d(TAG, "onEvent: $data")
if (type != null && type == "environment_updated" && data.isNotEmpty()) {
val flagEvent = Gson().fromJson(data, FlagEvent::class.java)
sseEventsFlow.tryEmit(flagEvent)
updates(Result.success(flagEvent))
}
}

override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
super.onFailure(eventSource, t, response)
t?.printStackTrace()
Log.d(TAG, "onFailure: ${t?.message}")
if (t != null)
updates(Result.failure(t))
else
updates(Result.failure(Throwable("Unknown error")))
}

override fun onOpen(eventSource: EventSource, response: Response) {
super.onOpen(eventSource, response)
Log.d(TAG, "onOpen: $eventSource")
}
}

init {
initEventSource()
}

private fun initEventSource() {
currentEventSource?.cancel()
currentEventSource = EventSources.createFactory(sseClient)
.newEventSource(request = sseRequest, listener = sseEventSourceListener)
}

companion object {
private const val TAG = "FlagsmithEventService"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.flagsmith.internal

///
internal interface FlagsmithEventTimeTracker {
var lastSeenAt: Double
}
Loading