Skip to content

Commit

Permalink
Further improve concurrency in EventPlatformClient. (#4494)
Browse files Browse the repository at this point in the history
* Further improve concurrency in EventPlatformClient.

* Lint.

---------

Co-authored-by: Sharvani Haran <[email protected]>
  • Loading branch information
dbrant and sharvaniharan authored Feb 29, 2024
1 parent 42bf68f commit 12e0938
Showing 1 changed file with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import org.wikipedia.util.ReleaseUtil
import org.wikipedia.util.log.L
import java.net.HttpURLConnection
import java.util.*
import java.util.concurrent.ConcurrentHashMap

object EventPlatformClient {
/**
* Stream configs to be fetched on startup and stored for the duration of the app lifecycle.
*/
private val STREAM_CONFIGS = mutableMapOf<String, StreamConfig>()
private val STREAM_CONFIGS = ConcurrentHashMap<String, StreamConfig>()

/*
* When ENABLED is false, items can be enqueued but not dequeued.
Expand All @@ -31,12 +32,10 @@ object EventPlatformClient {
*/
private var ENABLED = WikipediaApp.instance.isOnline

@Synchronized
fun setStreamConfig(streamConfig: StreamConfig) {
STREAM_CONFIGS[streamConfig.streamName] = streamConfig
}

@Synchronized
fun getStreamConfig(name: String): StreamConfig? {
return STREAM_CONFIGS[name]
}
Expand All @@ -45,7 +44,6 @@ object EventPlatformClient {
* Set whether the client is enabled. This can react to device online/offline state as well
* as other considerations.
*/
@Synchronized
fun setEnabled(enabled: Boolean) {
ENABLED = enabled
if (ENABLED) {
Expand All @@ -62,7 +60,6 @@ object EventPlatformClient {
*
* @param event event
*/
@Synchronized
fun submit(event: Event) {
if (!SamplingController.isInSample(event)) {
return
Expand All @@ -81,14 +78,12 @@ object EventPlatformClient {
.subscribe({ updateStreamConfigs(it.streamConfigs) }) { L.e(it) }
}

@Synchronized
private fun updateStreamConfigs(streamConfigs: Map<String, StreamConfig>) {
STREAM_CONFIGS.clear()
STREAM_CONFIGS.putAll(streamConfigs)
Prefs.streamConfigs = STREAM_CONFIGS
}

@Synchronized
fun setUpStreamConfigs() {
STREAM_CONFIGS.clear()
STREAM_CONFIGS.putAll(Prefs.streamConfigs)
Expand All @@ -115,12 +110,15 @@ object EventPlatformClient {
private const val TOKEN = "sendScheduled"
private val MAX_QUEUE_SIZE get() = Prefs.analyticsQueueSize

@Synchronized
fun sendAllScheduled() {
WikipediaApp.instance.mainThreadHandler.removeCallbacksAndMessages(TOKEN)
if (ENABLED) {
send()
QUEUE.clear()
val eventsByStream: Map<String, List<Event>>
synchronized(QUEUE) {
eventsByStream = QUEUE.groupBy { it.stream }
QUEUE.clear()
}
send(eventsByStream)
}
}

Expand All @@ -129,10 +127,11 @@ object EventPlatformClient {
*
* @param event event data
*/
@Synchronized
fun schedule(event: Event) {
if (ENABLED || QUEUE.size <= MAX_QUEUE_SIZE) {
QUEUE.add(event)
synchronized(QUEUE) {
QUEUE.add(event)
}
}
if (ENABLED) {
if (QUEUE.size >= MAX_QUEUE_SIZE) {
Expand All @@ -152,16 +151,16 @@ object EventPlatformClient {
* Also batch the events ordered by their streams, as the QUEUE
* can contain events of different streams
*/
private fun send() {
QUEUE.groupBy { it.stream }.forEach { (stream, events) ->
private fun send(eventsByStream: Map<String, List<Event>>) {
eventsByStream.forEach { (stream, events) ->
getStreamConfig(stream)?.let {
sendEventsForStream(it, events)
}
}
}

@SuppressLint("CheckResult")
fun sendEventsForStream(streamConfig: StreamConfig, events: List<Event>) {
private fun sendEventsForStream(streamConfig: StreamConfig, events: List<Event>) {
(if (ReleaseUtil.isDevRelease)
ServiceFactory.getAnalyticsRest(streamConfig).postEvents(events)
else
Expand Down

0 comments on commit 12e0938

Please sign in to comment.