Skip to content

Commit

Permalink
Improve emitter concurrency model
Browse files Browse the repository at this point in the history
  • Loading branch information
matus-tomlein committed Oct 16, 2023
1 parent 4584aa1 commit c42d16f
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 380 deletions.
437 changes: 184 additions & 253 deletions Sources/Core/Emitter/Emitter.swift

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions Sources/Core/NetworkConnection/NetworkControllerImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ import Foundation
class NetworkControllerImpl: Controller, NetworkController {
private var requestCallback: RequestCallback?

var isCustomNetworkConnection: Bool {
return emitter.networkConnection != nil && !(emitter.networkConnection is DefaultNetworkConnection)
}

// MARK: - Properties

var endpoint: String? {
Expand Down
27 changes: 18 additions & 9 deletions Sources/Core/Tracker/ServiceProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import Foundation

class ServiceProvider: NSObject, ServiceProviderProtocol {
private(set) var namespace: String
let namespace: String

var isTrackerInitialized: Bool { return _tracker != nil }

Expand Down Expand Up @@ -233,27 +233,36 @@ class ServiceProvider: NSObject, ServiceProviderProtocol {

func makeEmitter() -> Emitter {
let builder = { (emitter: Emitter) in
emitter.method = self.networkConfiguration.method
emitter.protocol = self.networkConfiguration.protocol
emitter.customPostPath = self.networkConfiguration.customPostPath
emitter.requestHeaders = self.networkConfiguration.requestHeaders
emitter.emitThreadPoolSize = self.emitterConfiguration.threadPoolSize
emitter.byteLimitGet = self.emitterConfiguration.byteLimitGet
emitter.byteLimitPost = self.emitterConfiguration.byteLimitPost
emitter.serverAnonymisation = self.emitterConfiguration.serverAnonymisation
emitter.emitRange = self.emitterConfiguration.emitRange
emitter.bufferOption = self.emitterConfiguration.bufferOption
emitter.eventStore = self.emitterConfiguration.eventStore
emitter.callback = self.emitterConfiguration.requestCallback
emitter.customRetryForStatusCodes = self.emitterConfiguration.customRetryForStatusCodes
emitter.retryFailedRequests = self.emitterConfiguration.retryFailedRequests
}

let emitter: Emitter
if let networkConnection = networkConfiguration.networkConnection {
emitter = Emitter(networkConnection: networkConnection, builder: builder)
emitter = Emitter(
networkConnection: networkConnection,
namespace: self.namespace,
eventStore: self.emitterConfiguration.eventStore,
builder: builder
)
} else {
emitter = Emitter(urlEndpoint: networkConfiguration.endpoint ?? "", builder: builder)
emitter = Emitter(
namespace: self.namespace,
urlEndpoint: networkConfiguration.endpoint ?? "",
method: self.networkConfiguration.method,
protocol: self.networkConfiguration.protocol,
customPostPath: self.networkConfiguration.customPostPath,
requestHeaders: self.networkConfiguration.requestHeaders,
serverAnonymisation: self.emitterConfiguration.serverAnonymisation,
eventStore: self.emitterConfiguration.eventStore,
builder: builder
)
}

if emitterConfiguration.isPaused {
Expand Down
2 changes: 0 additions & 2 deletions Sources/Core/Tracker/Tracker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ class Tracker {
}

private func setup() {
_emitter.namespace = self.trackerData.trackerNamespace // Needed to correctly send events to the right EventStore

if sessionContext {
self.trackerData.session = Session(
foregroundTimeout: self.trackerData.foregroundTimeout,
Expand Down
181 changes: 99 additions & 82 deletions Sources/Snowplow/Network/DefaultNetworkConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,75 +19,84 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {
// The protocol for connection to the collector
@objc
public var `protocol`: ProtocolOptions {
get {
return _protocol
}
set {
_protocol = newValue
if builderFinished { setup() }
}
get { return sync { _protocol } }
set { sync { _protocol = newValue; setup() } }
}

private var _urlString: String
/// The collector endpoint.
@objc
public var urlString: String {
get {
return urlEndpoint?.absoluteString ?? _urlString
}
set {
_urlString = newValue
if builderFinished { setup() }
}
get { return sync { urlEndpoint?.absoluteString ?? _urlString } }
set { sync { _urlString = newValue; setup() } }
}
@objc
private(set) public var urlEndpoint: URL?

private var _urlEndpoint: URL?
public var urlEndpoint: URL? { sync { return _urlEndpoint } }

private var _httpMethod: HttpMethodOptions = .post
/// HTTP method, should be .get or .post.
@objc
public var httpMethod: HttpMethodOptions {
get {
return _httpMethod
}
set(method) {
_httpMethod = method
if builderFinished && urlEndpoint != nil {
setup()
}
}
get { return sync { _httpMethod } }
set(method) { sync { _httpMethod = method; setup() } }
}

private var _emitThreadPoolSize = 15
/// The number of threads used by the emitter.
@objc
public var emitThreadPoolSize: Int {
get {
return _emitThreadPoolSize
}
get { sync { return _emitThreadPoolSize } }
set(emitThreadPoolSize) {
self._emitThreadPoolSize = emitThreadPoolSize
if dataOperationQueue.maxConcurrentOperationCount != emitThreadPoolSize {
dataOperationQueue.maxConcurrentOperationCount = emitThreadPoolSize
sync {
self._emitThreadPoolSize = emitThreadPoolSize
if dataOperationQueue.maxConcurrentOperationCount != emitThreadPoolSize {
dataOperationQueue.maxConcurrentOperationCount = emitThreadPoolSize
}
}
}
}

private var _byteLimitGet: Int = 40000
/// Maximum event size for a GET request.
public var byteLimitGet: Int = 40000
@objc
public var byteLimitGet: Int {
get { return sync { _byteLimitGet } }
set { sync { _byteLimitGet = newValue } }
}

private var _byteLimitPost = 40000
/// Maximum event size for a POST request.
@objc
public var byteLimitPost = 40000
public var byteLimitPost: Int {
get { return sync { _byteLimitPost } }
set { sync { _byteLimitPost = newValue } }
}

private var _customPostPath: String?
/// A custom path that is used on the endpoint to send requests.
@objc
public var customPostPath: String?
public var customPostPath: String? {
get { return sync { _customPostPath } }
set { sync { _customPostPath = newValue; setup() } }
}

private var _requestHeaders: [String : String]?
/// Custom headers (key, value) for http requests.
@objc
public var requestHeaders: [String : String]?
public var requestHeaders: [String : String]? {
get { return sync { _requestHeaders } }
set { sync { _requestHeaders = newValue } }
}
/// Whether to anonymise server-side user identifiers including the `network_userid` and `user_ipaddress`

private var _serverAnonymisation = false
@objc
public var serverAnonymisation = false
public var serverAnonymisation: Bool {
get { return sync { _serverAnonymisation } }
set { sync { _serverAnonymisation = newValue } }
}
private var dataOperationQueue = OperationQueue()
private var builderFinished = false

@objc
public init(urlString: String,
Expand All @@ -96,52 +105,13 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {
customPostPath: String? = nil) {
self._urlString = urlString
super.init()
self.httpMethod = httpMethod
self.protocol = `protocol`
self.customPostPath = customPostPath
self._httpMethod = httpMethod
self._protocol = `protocol`
self._customPostPath = customPostPath
setup()
}

// MARK: - Implement SPNetworkConnection protocol

private func setup() {
// Decode url to extract protocol
let url = URL(string: _urlString)
var endpoint = _urlString
if url?.scheme == "https" {
`protocol` = .https
} else if url?.scheme == "http" {
`protocol` = .http
} else {
`protocol` = .https
endpoint = "https://\(_urlString)"
}

// Configure
let urlPrefix = `protocol` == .http ? "http://" : "https://"
var urlSuffix = _httpMethod == .get ? kSPEndpointGet : kSPEndpointPost
if _httpMethod == .post {
if let customPostPath = customPostPath { urlSuffix = customPostPath }
}

// Remove trailing slashes from endpoint to avoid double slashes when appending path
endpoint = endpoint.trimmingCharacters(in: CharacterSet(charactersIn: "/"))

urlEndpoint = URL(string: endpoint)?.appendingPathComponent(urlSuffix)

// Log
if urlEndpoint?.scheme != nil && urlEndpoint?.host != nil {
logDebug(message: "Emitter URL created successfully '\(urlEndpoint?.absoluteString ?? "-")'")
} else {
logDebug(message: "Invalid emitter URL: '\(urlEndpoint?.absoluteString ?? "-")'")
}
let userDefaults = UserDefaults.standard
userDefaults.set(endpoint, forKey: kSPErrorTrackerUrl)
userDefaults.set(urlSuffix, forKey: kSPErrorTrackerProtocol)
userDefaults.set(urlPrefix, forKey: kSPErrorTrackerMethod)

builderFinished = true
}

@objc
public func sendRequests(_ requests: [Request]) -> [RequestResult] {
Expand Down Expand Up @@ -185,8 +155,45 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {
}

// MARK: - Private methods

private func setup() {
// Decode url to extract protocol
let url = URL(string: _urlString)
var endpoint = _urlString
if url?.scheme == "https" {
_protocol = .https
} else if url?.scheme == "http" {
_protocol = .http
} else {
_protocol = .https
endpoint = "https://\(_urlString)"
}

// Configure
let urlPrefix = _protocol == .http ? "http://" : "https://"
var urlSuffix = _httpMethod == .get ? kSPEndpointGet : kSPEndpointPost
if _httpMethod == .post {
if let customPostPath = _customPostPath { urlSuffix = customPostPath }
}

// Remove trailing slashes from endpoint to avoid double slashes when appending path
endpoint = endpoint.trimmingCharacters(in: CharacterSet(charactersIn: "/"))

func buildPost(_ request: Request) -> URLRequest {
_urlEndpoint = URL(string: endpoint)?.appendingPathComponent(urlSuffix)

// Log
if _urlEndpoint?.scheme != nil && _urlEndpoint?.host != nil {
logDebug(message: "Emitter URL created successfully '\(_urlEndpoint?.absoluteString ?? "-")'")
} else {
logDebug(message: "Invalid emitter URL: '\(_urlEndpoint?.absoluteString ?? "-")'")
}
let userDefaults = UserDefaults.standard
userDefaults.set(endpoint, forKey: kSPErrorTrackerUrl)
userDefaults.set(urlSuffix, forKey: kSPErrorTrackerProtocol)
userDefaults.set(urlPrefix, forKey: kSPErrorTrackerMethod)
}

private func buildPost(_ request: Request) -> URLRequest {
var requestData: Data? = nil
do {
requestData = try JSONSerialization.data(withJSONObject: request.payload?.dictionary ?? [:], options: [])
Expand All @@ -208,7 +215,7 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {
return urlRequest
}

func buildGet(_ request: Request) -> URLRequest {
private func buildGet(_ request: Request) -> URLRequest {
let payload = request.payload?.dictionary ?? [:]
let url = "\(urlEndpoint!.absoluteString)?\(Utilities.urlEncode(payload))"
let anUrl = URL(string: url)!
Expand All @@ -224,11 +231,21 @@ public class DefaultNetworkConnection: NSObject, NetworkConnection {
return urlRequest
}

func applyValuesAndHeaderFields(_ requestHeaders: [String : String], to request: inout URLRequest) {
private func applyValuesAndHeaderFields(_ requestHeaders: [String : String], to request: inout URLRequest) {
(requestHeaders as NSDictionary).enumerateKeysAndObjects({ key, obj, stop in
if let key = key as? String, let obj = obj as? String {
request.setValue(obj, forHTTPHeaderField: key)
}
})
}

// MARK: - dispatch queues

private let dispatchQueue = DispatchQueue(label: "snowplow.tracker.network_connection")

private func sync<T>(_ callback: () -> T) -> T {
dispatchPrecondition(condition: .notOnQueue(dispatchQueue))

return dispatchQueue.sync(execute: callback)
}
}
26 changes: 15 additions & 11 deletions Tests/Legacy Tests/LegacyTestEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ class LegacyTestEmitter: XCTestCase {
func testEmitterBuilderAndOptions() {
let `protocol` = "https"

let emitter = Emitter(urlEndpoint: TEST_SERVER_EMITTER) { emitter in
emitter.method = .post
emitter.protocol = .https
emitter.emitThreadPoolSize = 30
let emitter = Emitter(
namespace: "ns1",
urlEndpoint: TEST_SERVER_EMITTER,
method: .post,
protocol: .https
) { emitter in
emitter.byteLimitGet = 30000
emitter.byteLimitPost = 35000
emitter.emitRange = 500
emitter.emitThreadPoolSize = 30
}

var url = "\(`protocol`)://\(TEST_SERVER_EMITTER)/com.snowplowanalytics.snowplow/tp2"
Expand All @@ -71,11 +74,13 @@ class LegacyTestEmitter: XCTestCase {
XCTAssertEqual(emitter.byteLimitPost, 35000)
XCTAssertEqual(emitter.protocol, .https)

let customPathEmitter = Emitter(urlEndpoint: TEST_SERVER_EMITTER) { emitter in
emitter.method = .post
emitter.protocol = .https
emitter.customPostPath = "/com.acme.company/tpx"
emitter.emitThreadPoolSize = 30
let customPathEmitter = Emitter(
namespace: "ns2",
urlEndpoint: TEST_SERVER_EMITTER,
method: .post,
protocol: .https,
customPostPath: "/com.acme.company/tpx"
) { emitter in
emitter.byteLimitGet = 30000
emitter.byteLimitPost = 35000
emitter.emitRange = 500
Expand Down Expand Up @@ -367,12 +372,11 @@ class LegacyTestEmitter: XCTestCase {
// MARK: - Emitter builder

func emitter(with networkConnection: NetworkConnection, bufferOption: BufferOption = .single) -> Emitter {
let emitter = Emitter(networkConnection: networkConnection) { emitter in
let emitter = Emitter(networkConnection: networkConnection, namespace: "ns1", eventStore: MockEventStore()) { emitter in
emitter.bufferOption = bufferOption
emitter.emitRange = 200
emitter.byteLimitGet = 20000
emitter.byteLimitPost = 25000
emitter.eventStore = MockEventStore()
}
return emitter
}
Expand Down
4 changes: 1 addition & 3 deletions Tests/TestLifecycleState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ class TestLifecycleState: XCTestCase {

func testLifecycleStateMachine() {
let eventStore = MockEventStore()
let emitter = Emitter(urlEndpoint: "http://snowplow-fake-url.com") { emitter in
emitter.eventStore = eventStore
}
let emitter = Emitter(namespace: "namespace", urlEndpoint: "http://snowplow-fake-url.com", eventStore: eventStore)
let tracker = Tracker(trackerNamespace: "namespace", appId: nil, emitter: emitter) { tracker in
tracker.base64Encoded = false
tracker.lifecycleEvents = true
Expand Down
Loading

0 comments on commit c42d16f

Please sign in to comment.