Skip to content

Commit

Permalink
Use a single dispatch queue for all internal code, dispatch work on i…
Browse files Browse the repository at this point in the history
…t in public interfaces
  • Loading branch information
matus-tomlein committed Nov 8, 2023
1 parent c42d16f commit df75b3c
Show file tree
Hide file tree
Showing 45 changed files with 1,791 additions and 1,170 deletions.
234 changes: 103 additions & 131 deletions Sources/Core/Emitter/Emitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ let POST_WRAPPER_BYTES = 88

class Emitter: EmitterEventProcessing {

private var timer: Timer?
private var timer: InternalQueueTimer?

private var _pausedEmit = false
private var pausedEmit = false

/// Custom NetworkConnection istance to handle connection outside the emitter.
private let networkConnection: NetworkConnection
Expand All @@ -30,9 +30,8 @@ class Emitter: EmitterEventProcessing {

let eventStore: EventStore

private var _sending = false
/// Whether the emitter is currently sending.
var isSending: Bool { return sync { _sending } }
var isSending: Bool = false

/// Collector endpoint.
var urlEndpoint: String? {
Expand Down Expand Up @@ -80,29 +79,19 @@ class Emitter: EmitterEventProcessing {
}
}

private var _bufferOption: BufferOption = EmitterDefaults.bufferOption
/// Buffer option
var bufferOption: BufferOption {
get { return sync { _bufferOption } }
set(bufferOption) { sync { _bufferOption = bufferOption } }
}
var bufferOption: BufferOption = EmitterDefaults.bufferOption

private weak var _callback: RequestCallback?
/// Callbacks supplied with number of failures and successes of sent events.
var callback: RequestCallback? {
get { return sync { _callback } }
set(callback) { sync { _callback = callback } }
}
weak var callback: RequestCallback?

private var _emitRange = EmitterDefaults.emitRange
/// Number of events retrieved from the database when needed.
var emitRange: Int {
get { return sync { _emitRange } }
get { return _emitRange }
set(emitRange) {
sync {
if emitRange > 0 {
_emitRange = emitRange
}
if emitRange > 0 {
_emitRange = emitRange
}
}
}
Expand All @@ -127,27 +116,23 @@ class Emitter: EmitterEventProcessing {
/// Byte limit for GET requests.
private var _byteLimitGet = EmitterDefaults.byteLimitGet
var byteLimitGet: Int {
get { return sync { _byteLimitGet } }
get { return _byteLimitGet }
set(byteLimitGet) {
sync {
_byteLimitGet = byteLimitGet
if let networkConnection = networkConnection as? DefaultNetworkConnection {
networkConnection.byteLimitGet = byteLimitGet
}
_byteLimitGet = byteLimitGet
if let networkConnection = networkConnection as? DefaultNetworkConnection {
networkConnection.byteLimitGet = byteLimitGet
}
}
}

private var _byteLimitPost = EmitterDefaults.byteLimitPost
/// Byte limit for POST requests.
var byteLimitPost: Int {
get { return sync { _byteLimitPost } }
get { return _byteLimitPost }
set(byteLimitPost) {
sync {
_byteLimitPost = byteLimitPost
if let networkConnection = networkConnection as? DefaultNetworkConnection {
networkConnection.byteLimitPost = byteLimitPost
}
_byteLimitPost = byteLimitPost
if let networkConnection = networkConnection as? DefaultNetworkConnection {
networkConnection.byteLimitPost = byteLimitPost
}
}
}
Expand Down Expand Up @@ -200,16 +185,12 @@ class Emitter: EmitterEventProcessing {
/// Custom retry rules for HTTP status codes.
private var _customRetryForStatusCodes: [Int : Bool] = [:]
var customRetryForStatusCodes: [Int : Bool]? {
get { return sync { return _customRetryForStatusCodes } }
set { sync { _customRetryForStatusCodes = newValue ?? [:] } }
get { return _customRetryForStatusCodes }
set { _customRetryForStatusCodes = newValue ?? [:] }
}

/// Whether retrying failed requests is allowed
private var _retryFailedRequests: Bool = EmitterDefaults.retryFailedRequests
var retryFailedRequests: Bool {
get { return sync { _retryFailedRequests } }
set { sync { _retryFailedRequests = newValue } }
}
var retryFailedRequests: Bool = EmitterDefaults.retryFailedRequests

/// Returns the number of events in the DB.
var dbCount: Int {
Expand Down Expand Up @@ -270,133 +251,124 @@ class Emitter: EmitterEventProcessing {
// MARK: - Pause/Resume methods

func resumeTimer() {
weak var weakSelf = self

pauseTimer()

// NOTE: consider whether it is really necessary to use the main queue or we can dispatch on a background queue
DispatchQueue.main.async {
let timer = Timer.scheduledTimer(withTimeInterval: TimeInterval(kSPDefaultBufferTimeout), repeats: true) { [weak self] timer in
self?.flush()
}
weakSelf?.sync {
weakSelf?.timer = timer
}
self.timer = InternalQueue.startTimer(TimeInterval(kSPDefaultBufferTimeout)) { [weak self] in
self?.flush()
}
}

/// Suspends timer for periodically sending events to collector.
func pauseTimer() {
sync {
timer?.invalidate()
timer = nil
}
timer?.invalidate()
timer = nil
}

/// Allows sending events to collector.
func resumeEmit() {
sync { _pausedEmit = false }
pausedEmit = false
flush()
}

/// Suspends sending events to collector.
func pauseEmit() {
sync { _pausedEmit = true }
pausedEmit = true
}

/// Insert a Payload object into the buffer to be sent to collector.
/// This method will add the payload to the database and flush (send all events).
/// - Parameter eventPayload: A Payload containing a completed event to be added into the buffer.
func addPayload(toBuffer eventPayload: Payload) {
DispatchQueue.global(qos: .default).async { [weak self] in
self?.eventStore.addEvent(eventPayload)
self?.flush()
}
self.eventStore.addEvent(eventPayload)
self.flush()
}

/// Empties the buffer of events using the respective HTTP request method.
func flush() {
if requestToStartSending() {
emitAsync {
self.attemptEmit()
self.sync { self._sending = false }
}
self.attemptEmit()
}
}

// MARK: - Control methods

private func attemptEmit() {
if eventStore.count() == 0 {
InternalQueue.onQueuePrecondition()

let events = eventStore.emittableEvents(withQueryLimit: UInt(emitRange))
if events.isEmpty {
logDebug(message: "Database empty. Returning.")
stopSending()
return
}

let events = eventStore.emittableEvents(withQueryLimit: UInt(emitRange))

let requests = buildRequests(fromEvents: events)
let sendResults = networkConnection.sendRequests(requests)

logVerbose(message: "Processing emitter results.")

var successCount = 0
var failedWillRetryCount = 0
var failedWontRetryCount = 0
var removableEvents: [Int64] = []

for result in sendResults {
let resultIndexArray = result.storeIds
if result.isSuccessful {
successCount += resultIndexArray?.count ?? 0
if let array = resultIndexArray {
removableEvents.append(contentsOf: array)

let processResults: ([RequestResult]) -> Void = { sendResults in
logVerbose(message: "Processing emitter results.")

var successCount = 0
var failedWillRetryCount = 0
var failedWontRetryCount = 0
var removableEvents: [Int64] = []

for result in sendResults {
let resultIndexArray = result.storeIds
if result.isSuccessful {
successCount += resultIndexArray?.count ?? 0
if let array = resultIndexArray {
removableEvents.append(contentsOf: array)
}
} else if result.shouldRetry(self.customRetryForStatusCodes, retryAllowed: self.retryFailedRequests) {
failedWillRetryCount += resultIndexArray?.count ?? 0
} else {
failedWontRetryCount += resultIndexArray?.count ?? 0
if let array = resultIndexArray {
removableEvents.append(contentsOf: array)
}
logError(message: String(format: "Sending events to Collector failed with status %ld. Events will be dropped.", result.statusCode ?? -1))
}
} else if result.shouldRetry(customRetryForStatusCodes, retryAllowed: retryFailedRequests) {
failedWillRetryCount += resultIndexArray?.count ?? 0
} else {
failedWontRetryCount += resultIndexArray?.count ?? 0
if let array = resultIndexArray {
removableEvents.append(contentsOf: array)
}
let allFailureCount = failedWillRetryCount + failedWontRetryCount

_ = self.eventStore.removeEvents(withIds: removableEvents)

logDebug(message: String(format: "Success Count: %d", successCount))
logDebug(message: String(format: "Failure Count: %d", allFailureCount))

if let callback = self.callback {
if allFailureCount == 0 {
callback.onSuccess(withCount: successCount)
} else {
callback.onFailure(withCount: allFailureCount, successCount: successCount)
}
logError(message: String(format: "Sending events to Collector failed with status %ld. Events will be dropped.", result.statusCode ?? -1))
}
}
let allFailureCount = failedWillRetryCount + failedWontRetryCount

let _ = eventStore.removeEvents(withIds: removableEvents)

logDebug(message: String(format: "Success Count: %d", successCount))
logDebug(message: String(format: "Failure Count: %d", allFailureCount))

if let callback = callback {
if allFailureCount == 0 {
callback.onSuccess(withCount: successCount)

if failedWillRetryCount > 0 && successCount == 0 {
logDebug(message: "Ending emitter run as all requests failed.")

self.scheduleStopSending()
} else {
callback.onFailure(withCount: allFailureCount, successCount: successCount)
self.attemptEmit()
}
}

if failedWillRetryCount > 0 && successCount == 0 {
logDebug(message: "Ending emitter run as all requests failed.")
Thread.sleep(forTimeInterval: 5)
return
} else {
self.attemptEmit()
emitAsync {
let sendResults = self.networkConnection.sendRequests(requests)

InternalQueue.async {
processResults(sendResults)
}
}
}

private func buildRequests(fromEvents events: [EmitterEvent]) -> [Request] {
var requests: [Request] = []

let sendingTime = Utilities.getTimestamp()
let httpMethod = method
let (bufferOptionValue, byteLimit) = sync {
return (
_bufferOption.rawValue,
httpMethod == .get ? _byteLimitGet : _byteLimitPost
)
}
let byteLimit = method == .get ? byteLimitGet : byteLimitPost

if httpMethod == .get {
if method == .get {
for event in events {
let payload = event.payload
addSendingTime(to: payload, timestamp: sendingTime)
Expand All @@ -410,7 +382,7 @@ class Emitter: EmitterEventProcessing {
var eventArray: [Payload] = []
var indexArray: [Int64] = []

let iUntil = min(i + bufferOptionValue, events.count)
let iUntil = min(i + bufferOption.rawValue, events.count)
for j in i..<iUntil {
let event = events[j]

Expand Down Expand Up @@ -444,7 +416,7 @@ class Emitter: EmitterEventProcessing {
let request = Request(payloads: eventArray, emitterEventIds: indexArray)
requests.append(request)
}
i += bufferOptionValue
i += bufferOption.rawValue
}
}
return requests
Expand All @@ -464,27 +436,27 @@ class Emitter: EmitterEventProcessing {
}

private func requestToStartSending() -> Bool {
return sync {
if !_sending && !_pausedEmit {
_sending = true
return true
} else {
return false
}
if !isSending && !pausedEmit {
isSending = true
return true
} else {
return false
}
}

// MARK: - dispatch queues

private let dispatchQueue = DispatchQueue(label: "snowplow.tracker.emitter")
private func scheduleStopSending() {
InternalQueue.asyncAfter(TimeInterval(5)) { [weak self] in
self?.stopSending()
}
}

private func sync<T>(_ callback: () -> T) -> T {
dispatchPrecondition(condition: .notOnQueue(dispatchQueue))

return dispatchQueue.sync(execute: callback)
private func stopSending() {
isSending = false
}

private let emitQueue = DispatchQueue(label: "snowplow.tracker.emitter.emit")
// MARK: - dispatch queues

private let emitQueue = DispatchQueue(label: "snowplow.emitter")

private func emitAsync(_ callback: @escaping () -> Void) {
emitQueue.async(execute: callback)
Expand Down
Loading

0 comments on commit df75b3c

Please sign in to comment.