From fc11da226c460168373dcc3226f4074396a3c073 Mon Sep 17 00:00:00 2001 From: Tim Schmelter Date: Wed, 6 Mar 2019 08:58:03 -0800 Subject: [PATCH 1/2] Add Subscription status change handler; fix AppSyncMQTTClient data races - Fixed AppSyncMQTTClient data races (#184) - Added `statusChangeHandler` to AppSyncClient's `subscribe` method to provide status updates. - Added verbose logging around subscription connections - Cancel streams and reconnect threads on disconnect --- AWSAppSyncClient.xcodeproj/project.pbxproj | 12 + AWSAppSyncClient/AWSAppSyncClient.swift | 27 +- .../AWSAppSyncSubscriptionError.swift | 97 +++ .../AWSAppSyncSubscriptionWatcher.swift | 228 +++++-- .../AWSAppSyncSubscriptionWatcherStatus.swift | 66 ++ AWSAppSyncClient/Deprecations+Removals.swift | 25 + .../Internal/AppSyncLogWrapper.swift | 4 + .../Internal/AppSyncMQTTClient.swift | 563 +++++++++++++----- .../AppSyncSubscriptionWithSync.swift | 66 +- .../Internal/Foundation+Utils.swift | 31 + AWSAppSyncClient/MQTTSDK/AWSIoTMQTTClient.m | 122 ++-- .../AWSAppSyncAPIKeyAuthTests.swift | 267 --------- .../AppSyncMQTTClientTests.swift | 54 +- .../Helpers/MockSubscriptionWatcher.swift | 32 +- .../SubscriptionStressTestHelper.swift | 249 ++++---- .../SubscriptionTests.swift | 346 +++++++++++ CHANGELOG.md | 17 +- 17 files changed, 1446 insertions(+), 760 deletions(-) create mode 100644 AWSAppSyncClient/AWSAppSyncSubscriptionError.swift create mode 100644 AWSAppSyncClient/AWSAppSyncSubscriptionWatcherStatus.swift create mode 100644 AWSAppSyncIntegrationTests/SubscriptionTests.swift diff --git a/AWSAppSyncClient.xcodeproj/project.pbxproj b/AWSAppSyncClient.xcodeproj/project.pbxproj index 3a952a13..fd6b8694 100644 --- a/AWSAppSyncClient.xcodeproj/project.pbxproj +++ b/AWSAppSyncClient.xcodeproj/project.pbxproj @@ -123,6 +123,9 @@ FA0C12BB21CD308A00B438CB /* AWSAppSyncClientConfiguration.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12B921CD2FFB00B438CB /* AWSAppSyncClientConfiguration.swift */; }; FA0C12BF21CD360B00B438CB /* AWSAppSyncAuthType.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12BE21CD360B00B438CB /* AWSAppSyncAuthType.swift */; }; FA0C12C121CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12C021CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift */; }; + FA0D825422307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */; }; + FA0D82582230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */; }; + FA0D825C22317C7900E0EA82 /* SubscriptionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */; }; FA1A620C21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA1A620B21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift */; }; FA2B4598221DDF2C00F68E6C /* CachePersistenceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA2B4597221DDF2C00F68E6C /* CachePersistenceTests.swift */; }; FA2B459A221F436400F68E6C /* MutationQueuePerformanceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA2B4599221F436400F68E6C /* MutationQueuePerformanceTests.swift */; }; @@ -444,6 +447,9 @@ FA0C12BE21CD360B00B438CB /* AWSAppSyncAuthType.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncAuthType.swift; sourceTree = ""; }; FA0C12C021CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BasicAWSAPIKeyAuthProvider.swift; sourceTree = ""; }; FA0C12C721CD96BF00B438CB /* AWSAppSyncClientConfigurationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncClientConfigurationTests.swift; sourceTree = ""; }; + FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncSubscriptionWatcherStatus.swift; sourceTree = ""; }; + FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncSubscriptionError.swift; sourceTree = ""; }; + FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionTests.swift; sourceTree = ""; }; FA1A620B21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncRetryHandlerTests.swift; sourceTree = ""; }; FA2B4597221DDF2C00F68E6C /* CachePersistenceTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CachePersistenceTests.swift; sourceTree = ""; }; FA2B4599221F436400F68E6C /* MutationQueuePerformanceTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MutationQueuePerformanceTests.swift; sourceTree = ""; }; @@ -776,7 +782,9 @@ FAAACC2521DD7BC300D24B37 /* AWSAppSyncMutations.swift */, FAF5D78C21D3F04E00FC04D2 /* AWSAppSyncServiceConfig.swift */, FA3E128421D5297500F2D19A /* AWSAppSyncServiceConfigError.swift */, + FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */, 1729A0CF1FA1365900F88594 /* AWSAppSyncSubscriptionWatcher.swift */, + FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */, DF9468DA20E1CA4A00E40482 /* AWSNetworkTransport.swift */, 17A7F8C91FB8BCBC008D1393 /* AWSS3ObjectProtocol.swift */, CCEF79E021DE7FF7004AD64D /* Deprecations+Removals.swift */, @@ -885,6 +893,7 @@ 17664128214F6732003AE269 /* AWSAppSyncAPIKeyAuthTests.swift */, 174F80AE2109229C00775D0D /* AWSAppSyncCognitoAuthTests.swift */, FA8C62C421D6E41600FF9924 /* Info.plist */, + FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */, FAA83BF7220A6AB40029FF7B /* testS3Object.jpg */, ); path = AWSAppSyncIntegrationTests; @@ -1568,6 +1577,7 @@ 17E009C61FCAB234005031DB /* GraphQLSelectionSet.swift in Sources */, FAFB258722051B2100068B38 /* AWSAppSyncClientInfo.swift in Sources */, 17E009C81FCAB234005031DB /* Locking.swift in Sources */, + FA0D825422307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift in Sources */, FA3E128321D5290900F2D19A /* AWSAppSyncClientInfoError.swift in Sources */, FABD70692203C17A00C99B47 /* AWSAppSyncCache.swift in Sources */, 174F80922107E74F00775D0D /* AWSMQTTDecoder.m in Sources */, @@ -1576,6 +1586,7 @@ FA0C12BB21CD308A00B438CB /* AWSAppSyncClientConfiguration.swift in Sources */, 178B31081FCDB34100EA4619 /* AWSAppSyncClientConflictResolutionExtensions.swift in Sources */, 1729A0D01FA1365900F88594 /* AWSAppSyncSubscriptionWatcher.swift in Sources */, + FA0D82582230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift in Sources */, FAAACC2421DD7AC600D24B37 /* InternalS3ObjectDetails.swift in Sources */, 17E009D91FCAB234005031DB /* JSONStandardTypeConversions.swift in Sources */, FA0C12C121CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift in Sources */, @@ -1635,6 +1646,7 @@ FA902D1021D97EB100C4052F /* AWSAppSyncCognitoAuthTests.swift in Sources */, FA902D1321D97EC500C4052F /* SubscriptionStressTestHelper.swift in Sources */, FA902D0F21D97EAD00C4052F /* AWSAppSyncAPIKeyAuthTests.swift in Sources */, + FA0D825C22317C7900E0EA82 /* SubscriptionTests.swift in Sources */, ); runOnlyForDeploymentPostprocessing = 0; }; diff --git a/AWSAppSyncClient/AWSAppSyncClient.swift b/AWSAppSyncClient/AWSAppSyncClient.swift index da2674a4..3a31c52e 100644 --- a/AWSAppSyncClient/AWSAppSyncClient.swift +++ b/AWSAppSyncClient/AWSAppSyncClient.swift @@ -9,6 +9,8 @@ import AWSCore public typealias SubscriptionResultHandler = (_ result: GraphQLResult?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void +public typealias SubscriptionStatusChangeHandler = (AWSAppSyncSubscriptionWatcherStatus) -> Void + public typealias DeltaQueryResultHandler = (_ result: GraphQLResult?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void public typealias OptimisticResponseBlock = (ApolloStore.ReadWriteTransaction?) -> Void @@ -17,23 +19,6 @@ public typealias MutationConflictHandler = (_ serverS internal let NoOpOperationString = "No-op" -public struct AWSAppSyncSubscriptionError: Error, LocalizedError { - let additionalInfo: String? - let errorDetails: [String: String]? - - public var errorDescription: String? { - return additionalInfo ?? "Unable to start subscription." - } - - public var recoverySuggestion: String? { - return errorDetails?["recoverySuggestion"] - } - - public var failureReason: String? { - return errorDetails?["failureReason"] - } -} - /// Delegates will be notified when a mutation is performed from the `mutationCallback`. This pattern is necessary /// in order to provide notifications of mutations which are performed after an app restart and the initial callback /// context has been lost. @@ -76,7 +61,6 @@ public class AWSAppSyncClient { self.autoSubmitOfflineMutations = appSyncConfig.autoSubmitOfflineMutations self.store = appSyncConfig.store - self.appSyncMQTTClient.allowCellularAccess = appSyncConfig.allowsCellularAccess self.presignedURLClient = appSyncConfig.presignedURLClient self.s3ObjectManager = appSyncConfig.s3ObjectManager self.subscriptionMetadataCache = appSyncConfig.subscriptionMetadataCache @@ -155,7 +139,10 @@ public class AWSAppSyncClient { return apolloClient!.watch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler) } - public func subscribe(subscription: Subscription, queue: DispatchQueue = DispatchQueue.main, resultHandler: @escaping SubscriptionResultHandler) throws -> AWSAppSyncSubscriptionWatcher? { + public func subscribe(subscription: Subscription, + queue: DispatchQueue = DispatchQueue.main, + statusChangeHandler: SubscriptionStatusChangeHandler? = nil, + resultHandler: @escaping SubscriptionResultHandler) throws -> AWSAppSyncSubscriptionWatcher? { return AWSAppSyncSubscriptionWatcher(client: self.appSyncMQTTClient, httpClient: self.httpTransport!, @@ -163,6 +150,7 @@ public class AWSAppSyncClient { subscriptionsQueue: self.subscriptionsQueue, subscription: subscription, handlerQueue: queue, + statusChangeHandler: statusChangeHandler, resultHandler: resultHandler) } @@ -174,6 +162,7 @@ public class AWSAppSyncClient { subscriptionsQueue: self.subscriptionsQueue, subscription: subscription, handlerQueue: queue, + statusChangeHandler: nil, connectedCallback: connectCallback, resultHandler: resultHandler) } diff --git a/AWSAppSyncClient/AWSAppSyncSubscriptionError.swift b/AWSAppSyncClient/AWSAppSyncSubscriptionError.swift new file mode 100644 index 00000000..64470b7d --- /dev/null +++ b/AWSAppSyncClient/AWSAppSyncSubscriptionError.swift @@ -0,0 +1,97 @@ +// +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Licensed under the Amazon Software License +// http://aws.amazon.com/asl/ +// + +import Foundation + +public enum AWSAppSyncSubscriptionError: Error, LocalizedError { + /// The underlying MQTT client reported a status of "connectionError" + case connectionError + + /// The underlying MQTT client reported a status of "connectionRefused" + case connectionRefused + + /// The underlying MQTT client reported a status of "disconnected" + case disconnected + + /// An error occurred parsing the subscription message received from the service + case messageCallbackError(String) + + /// Some other error occurred. See associated value for details + case other(Error) + + /// An error occurred parsing the published subscription message + case parseError(Error) + + /// The underlying MQTT client reported a status of "protocolError" + case protocolError + + /// An error occurred while making the initial subscription request to AppSync, parsing its response, or + /// evaluating the response's subscription info payload + case setupError(String) + + /// The underlying MQTT client reported a status of "unknown" + case unknownMQTTConnectionStatus + + public var errorDescription: String? { + switch self { + case .messageCallbackError(let message): + return message + case .other(let error): + return error.localizedDescription + case .parseError(let error): + return error.localizedDescription + case .setupError(let message): + return message + case .unknownMQTTConnectionStatus: + return "MQTT status unknown" + default: + return "Subscription Terminated." + } + } + + public var recoverySuggestion: String? { + switch self { + case .other(let error as NSError): + return error.localizedRecoverySuggestion + case .parseError, .unknownMQTTConnectionStatus: + return nil + default: + return "Restart subscription request." + } + } + + public var failureReason: String? { + switch self { + case .other(let error as NSError): + return error.localizedFailureReason + case .parseError, .unknownMQTTConnectionStatus: + return nil + case .setupError(let message): + return message + default: + return "Disconnected from service." + } + } +} + +extension AWSAppSyncSubscriptionError { + static func from(status: AWSIoTMQTTStatus) -> AWSAppSyncSubscriptionError? { + switch status { + case .connectionError: + return .connectionError + case .connectionRefused: + return .connectionRefused + case .disconnected: + return .disconnected + case .protocolError: + return .protocolError + case .unknown: + return .unknownMQTTConnectionStatus + default: + return nil + } + } +} diff --git a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift index 82c9139f..04f62ed8 100644 --- a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift +++ b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcher.swift @@ -7,15 +7,25 @@ import Dispatch import os.log -@objc protocol MQTTSubscriptionWatcher: AnyObject { +/// A protocol to allow our Swift AWSAppSyncSubscriptionWatcher class to be referenced by the Objective-C +/// AWSIoTMQTTClient. +@objc protocol MQTTSubscriptionWatcher { func getIdentifier() -> Int + func getTopics() -> [String] + func messageCallbackDelegate(data: Data) + func disconnectCallbackDelegate(error: Error) + func connectedCallbackDelegate() + + func statusChangeDelegate(status: AWSIoTMQTTStatus) + + func subscriptionAcknowledgementDelegate() } -class SubscriptionsOrderHelper { +private class SubscriptionsOrderHelper { var count = 0 var previousCall = Date() var pendingCount = 0 @@ -44,24 +54,37 @@ class SubscriptionsOrderHelper { /// A `AWSAppSyncSubscriptionWatcher` is responsible for watching the subscription, and calling the result handler with a new result whenever any of the data is published on the MQTT topic. It also normalizes the cache before giving the callback to customer. public final class AWSAppSyncSubscriptionWatcher: MQTTSubscriptionWatcher, Cancellable { - - weak var client: AppSyncMQTTClient? - weak var httpClient: AWSNetworkTransport? - var subscription: Subscription? - let handlerQueue: DispatchQueue - var resultHandler: SubscriptionResultHandler? - var connectedCallback: (() -> Void)? - internal var subscriptionTopic: [String]? - let store: ApolloStore - public let uniqueIdentifier = SubscriptionsOrderHelper.sharedInstance.getLatestCount() - internal var isCancelled: Bool = false - - init(client: AppSyncMQTTClient, httpClient: AWSNetworkTransport, store: ApolloStore, subscriptionsQueue: DispatchQueue, subscription: Subscription, handlerQueue: DispatchQueue, connectedCallback: (() -> Void)? = nil, resultHandler: @escaping SubscriptionResultHandler) { + + private weak var client: AppSyncMQTTClient? + private weak var httpClient: AWSNetworkTransport? + private let subscription: Subscription + private let handlerQueue: DispatchQueue + private var resultHandler: SubscriptionResultHandler? + private var connectedCallback: (() -> Void)? + private var statusChangeHandler: SubscriptionStatusChangeHandler? + private let store: ApolloStore + private var isCancelled: Bool = false + private var subscriptionTopic: [String]? + + private let uniqueIdentifier = SubscriptionsOrderHelper.sharedInstance.getLatestCount() + private var status = AWSAppSyncSubscriptionWatcherStatus.connecting + + init(client: AppSyncMQTTClient, + httpClient: AWSNetworkTransport, + store: ApolloStore, + subscriptionsQueue: DispatchQueue, + subscription: Subscription, + handlerQueue: DispatchQueue, + statusChangeHandler: SubscriptionStatusChangeHandler? = nil, + connectedCallback: (() -> Void)? = nil, + resultHandler: @escaping SubscriptionResultHandler) { + AppSyncLog.verbose("Subscribing to operation \(subscription)") self.client = client self.httpClient = httpClient self.store = store self.subscription = subscription self.handlerQueue = handlerQueue + self.statusChangeHandler = statusChangeHandler self.connectedCallback = connectedCallback self.resultHandler = { (result, transaction, error) in handlerQueue.async { @@ -83,7 +106,7 @@ public final class AWSAppSyncSubscriptionWatcher Void) { do { - _ = try self.httpClient?.sendSubscriptionRequest(operation: subscription!, completionHandler: {[weak self] (response, error) in - guard let self = self else {return} - guard self.isCancelled == false else {return} - if let response = response { - do { - let subscriptionResult = try AWSGraphQLSubscriptionResponseParser(body: response).parseResult() - if let subscriptionInfo = subscriptionResult.subscriptionInfo { - self.subscriptionTopic = subscriptionResult.newTopics - self.client?.addWatcher(watcher: self, topics: subscriptionResult.newTopics!, identifier: self.uniqueIdentifier) - self.client?.startSubscriptions(subscriptionInfo: subscriptionInfo, identifier: self.uniqueIdentifier.description) - } - completionHandler(true, nil) - } catch { - completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) - } - } else if let error = error { - completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) + _ = try httpClient?.sendSubscriptionRequest(operation: subscription, completionHandler: {[weak self] (response, error) in + AppSyncLog.debug("Received AWSGraphQLSubscriptionResponse") + + guard let self = self else { + return } + + guard self.isCancelled == false else { + return + } + + guard error == nil else { + AppSyncLog.error("Unexpected error in subscription request: \(error!)") + completionHandler(false, AWSAppSyncSubscriptionError.setupError(error!.localizedDescription)) + return + } + + guard let response = response else { + let message = "Response unexpectedly nil subscribing \(self.getIdentifier())" + AppSyncLog.error(message) + let error = AWSAppSyncSubscriptionError.setupError(message) + completionHandler(false, error) + return + } + + let subscriptionResult: AWSGraphQLSubscriptionResponse + do { + subscriptionResult = try AWSGraphQLSubscriptionResponseParser(body: response).parseResult() + } catch { + AppSyncLog.error("Error parsing subscription result: \(error)") + completionHandler(false, AWSAppSyncSubscriptionError.setupError(error.localizedDescription)) + return + } + + guard let subscriptionInfo = subscriptionResult.subscriptionInfo else { + let message = "Subscription info unexpectedly nil in subscription result \(self.getIdentifier())" + AppSyncLog.error(message) + let error = AWSAppSyncSubscriptionError.setupError(message) + completionHandler(false, error) + return + } + + AppSyncLog.verbose("New subscription set: \(subscriptionInfo.count)") + + self.subscriptionTopic = subscriptionResult.newTopics + AppSyncLog.debug("Subscription watcher \(self.getIdentifier()) now watching topics: \(self.subscriptionTopic ?? [])") + + self.client?.add(watcher: self, forNewTopics: subscriptionResult.newTopics!) + + self.client?.startSubscriptions(subscriptionInfos: subscriptionInfo, identifier: self.uniqueIdentifier) + + completionHandler(true, nil) }) } catch { - completionHandler(false, AWSAppSyncSubscriptionError(additionalInfo: error.localizedDescription, errorDetails: nil)) + AppSyncLog.error("Error performing subscription request: \(error)") + completionHandler(false, AWSAppSyncSubscriptionError.setupError(error.localizedDescription)) } } func getTopics() -> [String] { return subscriptionTopic ?? [String]() } + + deinit { + // call cancel here before exiting + cancel() + } + /// Cancel any in progress fetching operations and unsubscribe from the messages. After canceling, no updates will + /// be delivered to the result handler or status change handler. + /// + /// Internally, this method sets an `isCancelled` flag to prevent any future activity, and issues a + /// `cancelSubscription` on the client to cancel subscriptions on the service. It also releases retained handler + /// blocks and clients. + /// + /// Specifically, this means that cancelling a subscription watcher will not invoke `statusChangeHandler` or + /// `resultHandler`, although it will set the internal state of the watcher to `.disconnected` + public func cancel() { + isCancelled = true + status = .disconnected + client?.cancelSubscription(for: self) + client = nil + httpClient = nil + resultHandler = nil + statusChangeHandler = nil + subscriptionTopic = nil + } + + // MARK: - MQTTSubscriptionWatcher + func disconnectCallbackDelegate(error: Error) { self.resultHandler?(nil, nil, error) } - + func connectedCallbackDelegate() { - AppSyncLog.debug("DS: connectedCallback attempted. connected callback is null: \(connectedCallback == nil)") + AppSyncLog.debug("MQTT connectedCallback \(connectedCallback == nil ? "" : "(callback is null)")") connectedCallback?() } - + func messageCallbackDelegate(data: Data) { do { - AppSyncLog.verbose("Received message in messageCallbackDelegate") - + AppSyncLog.debug("Received message") + AppSyncLog.verbose("First 128 bytes of message data is [\(data.prefix(upTo: 128))]") + guard String(data: data, encoding: .utf8) != nil else { - AppSyncLog.error("Unable to convert message data to String using UTF8 encoding") - AppSyncLog.debug("Message data is [\(data)]") + let error = AWSAppSyncSubscriptionError.messageCallbackError("Unable to convert message data to String using UTF8 encoding") + AppSyncLog.error(error) + self.resultHandler?(nil, nil, error) return } - + + // If `deserialize` throws an error, it will be caught in the `catch` block below. If it + // succeeds, but the result cannot be cast to a JSON object, we'll handle it inside the body + // of the guard statement. guard let jsonObject = try JSONSerializationFormat.deserialize(data: data) as? JSONObject else { - AppSyncLog.error("Unable to deserialize message data") - AppSyncLog.debug("Message data is [\(data)]") + let error = AWSAppSyncSubscriptionError.messageCallbackError("Unable to deserialize message data") + AppSyncLog.error(error) + self.resultHandler?(nil, nil, error) return } - - let response = GraphQLResponse(operation: subscription!, body: jsonObject) - + + let response = GraphQLResponse(operation: subscription, body: jsonObject) + firstly { try response.parseResult(cacheKeyForObject: self.store.cacheKeyForObject) }.andThen { (result, records) in _ = self.store.withinReadWriteTransaction { transaction in self.resultHandler?(result, transaction, nil) } - + if let records = records { self.store.publish(records: records, context: nil).catch { error in preconditionFailure(String(describing: error)) } } }.catch { error in - self.resultHandler?(nil, nil, error) + self.resultHandler?(nil, nil, AWSAppSyncSubscriptionError.parseError(error)) } } catch { - self.resultHandler?(nil, nil, error) + self.resultHandler?(nil, nil, AWSAppSyncSubscriptionError.parseError(error)) } } - - deinit { - // call cancel here before exiting - cancel() - } - - /// Cancel any in progress fetching operations and unsubscribe from the messages. - public func cancel() { - self.isCancelled = true - client?.stopSubscription(subscription: self, subscriptionId: uniqueIdentifier.description) - self.client = nil - self.httpClient = nil - self.resultHandler = nil - self.subscriptionTopic = nil - self.subscription = nil + + /// The watcher has received a status update for the underlying MQTT client. This method will translate the incoming + /// status + /// + /// - Parameter status: The new AWSIoTMQTTStatus. This will be resolved to a AWSAppSyncSubscriptionStatus and trigger the notification handler + func statusChangeDelegate(status: AWSIoTMQTTStatus) { + let subscriptionWatcherStatus = status.toSubscriptionWatcherStatus + statusChangeHandler?(subscriptionWatcherStatus) } + + /// The underlying client has received a subscription acknowledgement from the broker. This means the watcher is now + /// receiving subscriptions. This is the only code path that can set the status to `.connected`. + func subscriptionAcknowledgementDelegate() { + guard !isCancelled else { + return + } + + status = .connected + statusChangeHandler?(status) + } + } diff --git a/AWSAppSyncClient/AWSAppSyncSubscriptionWatcherStatus.swift b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcherStatus.swift new file mode 100644 index 00000000..6395584f --- /dev/null +++ b/AWSAppSyncClient/AWSAppSyncSubscriptionWatcherStatus.swift @@ -0,0 +1,66 @@ +// +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Licensed under the Amazon Software License +// http://aws.amazon.com/asl/ +// + +import Foundation + +/// The status of a SubscriptionWatcher +public enum AWSAppSyncSubscriptionWatcherStatus { + /// The subscription is in process of connecting + case connecting + + /// The subscription has connected and is receiving events from the service + case connected + + /// The subscription has been disconnected because of a lifecycle event or manual disconnect request + case disconnected + + /// The subscription is in an error state. The enum's associated value will provide more details, including recovery options if available. + case error(AWSAppSyncSubscriptionError) +} + +extension AWSIoTMQTTStatus: CustomStringConvertible { + public var description: String { + switch self { + case .connected: + return "connected" + case .connecting: + return "connecting" + case .connectionError: + return "connectionError" + case .connectionRefused: + return "connectionRefused" + case .disconnected: + return "disconnected" + case .protocolError: + return "protocolError" + case .unknown: + return "unknown" + } + } + + /// Convert to the equivalent SubscriptionWatcherStatus. Note that there is no equivalent status to + /// SubscriptionWatcherStatus.connected -- that can only be set upon receipt of a SUBACK from the service. + var toSubscriptionWatcherStatus: AWSAppSyncSubscriptionWatcherStatus { + switch self { + case .connected: + return .connecting + case .connecting: + return .connecting + case .connectionError: + return .error(.connectionError) + case .connectionRefused: + return .error(.connectionRefused) + case .disconnected: + return .error(.disconnected) + case .protocolError: + return .error(.protocolError) + case .unknown: + return .error(.unknownMQTTConnectionStatus) + } + + } + +} diff --git a/AWSAppSyncClient/Deprecations+Removals.swift b/AWSAppSyncClient/Deprecations+Removals.swift index 9bff834d..1b8bc4e1 100644 --- a/AWSAppSyncClient/Deprecations+Removals.swift +++ b/AWSAppSyncClient/Deprecations+Removals.swift @@ -6,6 +6,31 @@ import Foundation +extension AWSAppSyncSubscriptionError { + @available(*, deprecated, message: "use errorDescription instead") + var additionalInfo: String? { + switch self { + case .connectionRefused, .connectionError, .protocolError, .disconnected: + return "Subscription Terminated." + case .parseError(let error): + return error.localizedDescription + default: + return nil + } + } + + @available(*, deprecated, message: "use recoverySuggestion and failureReason instead") + var errorDetails: [String: String]? { + switch self { + case .connectionRefused, .connectionError, .protocolError, .disconnected: + return ["recoverySuggestion": "Restart subscription request.", + "failureReason": "Disconnected from service."] + default: + return nil + } + } +} + extension AWSAppSyncClientError { @available(*, deprecated, message: "use the enum pattern matching instead") diff --git a/AWSAppSyncClient/Internal/AppSyncLogWrapper.swift b/AWSAppSyncClient/Internal/AppSyncLogWrapper.swift index 6feea051..9291462d 100644 --- a/AWSAppSyncClient/Internal/AppSyncLogWrapper.swift +++ b/AWSAppSyncClient/Internal/AppSyncLogWrapper.swift @@ -25,6 +25,10 @@ final class AppSyncLog { log(message, flag: .error, file: file, function: function, line: line) } + class func error(_ error: Error, file: String = #file, function: String = #function, line: Int = #line) { + log(error.localizedDescription, flag: .error, file: file, function: function, line: line) + } + private class func log(_ message: @autoclosure () -> String, flag: AWSDDLogFlag, file: String, function: String, line: Int) { if AppSyncLogHelper.shouldLog(flag: flag) { AppSyncLogHelper.log(message(), diff --git a/AWSAppSyncClient/Internal/AppSyncMQTTClient.swift b/AWSAppSyncClient/Internal/AppSyncMQTTClient.swift index 0a006481..bb26f991 100644 --- a/AWSAppSyncClient/Internal/AppSyncMQTTClient.swift +++ b/AWSAppSyncClient/Internal/AppSyncMQTTClient.swift @@ -5,123 +5,139 @@ import Foundation +/// A class that manages the associations amongst individual AWSIoTMQTTClients, their associated topics, and +/// the watchers that have registered subscriptions for those topics. This class is thread safe. class AppSyncMQTTClient: AWSIoTMQTTClientDelegate { - - var mqttClients = Set>() - var mqttClientsWithTopics = [AWSIoTMQTTClient: Set]() - var topicSubscribers = TopicSubscribers() - var allowCellularAccess = true - var scheduledSubscription: DispatchSourceTimer? - var subscriptionsQueue = DispatchQueue.global(qos: .userInitiated) - var cancelledSubscriptions = [String: Bool]() - - func receivedMessageData(_ data: Data!, onTopic topic: String!) { - self.subscriptionsQueue.async { [weak self] in - guard let self = self, let topics = self.topicSubscribers[topic] else { - return - } - - for subscribedTopic in topics { - subscribedTopic.messageCallbackDelegate(data: data) - } + + /// Queue for synchronizing state + private let concurrencyQueue = DispatchQueue(label: "com.amazonaws.AppSyncMQTTClient.concurrencyQueue", attributes: .concurrent) + + /// A set of subscriptions that have been requested to be stopped. Any future actions on them will be ignored. + private var cancelledSubscriptions = [Int: Bool]() + + /// A timer to start a subscription in the near future + private var scheduledSubscription: DispatchSourceTimer? + + /// The queue on which subscription callbacks (e.g., connected callbacks, data received callbacks) will be + /// invoked. + private var subscriptionsQueue = DispatchQueue.global(qos: .userInitiated) + + // Associations of topics to clients to watchers + + /// A map of MQTT clients to their associated topics + private var topicsByClient = [AWSIoTMQTTClient: Set]() + + /// MQTT clients that have been flagged for cancellation due to a reconnect, but are waiting to be released pending + /// a subscription acknowledgement from the service + private var expiringClientsByTopic = TopicWeakMap>() + + /// A map of topics to their associated watcher blocks + private var subscribersByTopic = TopicWeakMap() + + /// Adds a subscription watcher for new topics. This does not actually subscribe to the topics or create + /// a connection, it only registers interest in the topic + /// + /// - Parameters: + /// - watcher: The MQTTSubscriptionWatcher that will receive updates for the topics + /// - topics: The new topics to be watched + func add(watcher: MQTTSubscriptionWatcher, forNewTopics topics: [String]) { + concurrencyQueue.async(flags: .barrier) { [weak self] in + self?.subscribersByTopic.add(watcher, forTopics: topics) } } - - func connectionStatusChanged(_ status: AWSIoTMQTTStatus, client mqttClient: AWSIoTMQTTClient) { - self.subscriptionsQueue.async { [weak self] in - guard let self = self, let topics = self.mqttClientsWithTopics[mqttClient] else { + + /// Schedules subscription connection/reconnections to start for the specified AWSSubscriptionInfo objects. If the + /// watcher with `identifier` has already been cancelled, no subscriptions will be restarted. + /// + /// Internally, this method schedules the subscription connection/reconnection after a brief delay. This is to + /// allow the AppSync service to propagate some policy information to the PubSub broker, AWSIoT. + /// + /// When this method returns, the subscription is scheduled, but the client is not yet connected. The watcher + /// will receive a connection state callback when the initial socket is connected. + /// + /// - Parameters: + /// - subscriptionInfos: An array of AWSSubscriptionInfo objects to be subscribed to. + /// - identifier: The watcher that has registered interest in at least one of the topics in the + /// `subscriptionInfo` array. + func startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo], identifier: Int) { + AppSyncLog.debug("Starting new subscriptions for watcher \(identifier)") + + scheduledSubscription = DispatchSource.makeOneOffDispatchSourceTimer(interval: .seconds(1), queue: subscriptionsQueue) { [weak self] in + self?.resetAndStartSubscriptions(subscriptionInfos: subscriptionInfos, identifier: identifier) + } + scheduledSubscription?.resume() + } + + /// Establishes new connections for each specified `subscriptionInfo`, as long as the watcher with `identifer` + /// has not yet been cancelled. + /// + /// Internally, this method: + /// - Create a candidate list of clients to expire, associated with their topics + /// - Start new subscriptions for the specified subscriptionInfo objects + /// - For each topic in subscriptionInfos, retain the expiring client in `expiringClientsByTopic` + /// - For each remaining client in the candidate list (in other words, clients that have no associated + /// topics), immediately cancel it since it is not associated with an active topic + /// - When the subscription acknowledgement is received for a topic, cancel and disconnect the old client, + /// and remove it from the expiring client list + /// + /// - Parameters: + /// - subscriptionInfos: An array of AWSSubscriptionInfo objects to be subscribed to. + /// - identifier: The watcher that has registered interest in at least one of the topics in the + /// `subscriptionInfo` array. + private func resetAndStartSubscriptions(subscriptionInfos: [AWSSubscriptionInfo], identifier: Int) { + + concurrencyQueue.async(flags: .barrier) { [weak self] in + guard let self = self else { return } - - if status.rawValue == 2 { - for topic in topics { - mqttClient.subscribe(toTopic: topic, qos: 1, extendedCallback: nil) + + AppSyncLog.debug("Starting \(subscriptionInfos.count) new clients; disconnecting \(self.topicsByClient.count) old clients") + + // Now that we've saved the old clients, clear the active clients. They will be repopulated during + // the startNewConnection + for (client, topics) in self.topicsByClient { + self.expiringClientsByTopic.add(client, forTopics: Array(topics)) + } + self.topicsByClient.removeAll() + + // If any of the new subscriptions are uncancelled, start new subscriptions before destroying + // the old ones + if self.shouldSubscribe(identifier: identifier) { + for subscriptionInfo in subscriptionInfos { + self.startNewConnection(for: subscriptionInfo) } - topics.map({ self.topicSubscribers[$0] }) - .compactMap({$0}) - .flatMap({$0}) - .forEach({$0.connectedCallbackDelegate()}) - } else if status.rawValue >= 3 { - let error = AWSAppSyncSubscriptionError( - additionalInfo: "Subscription Terminated.", - errorDetails: [ - "recoverySuggestion": "Restart subscription request.", - "failureReason": "Disconnected from service."]) - - topics.map({ self.topicSubscribers[$0] }) - .compactMap({$0}) - .flatMap({$0}) - .forEach({$0.disconnectCallbackDelegate(error: error)}) } } } - - func addWatcher(watcher: MQTTSubscriptionWatcher, topics: [String], identifier: Int) { - topicSubscribers.add(watcher: watcher, topics: topics) - } - - func startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo], identifier: String) { - func createTimer(_ interval: Int, queue: DispatchQueue, block: @escaping () -> Void ) -> DispatchSourceTimer { - let timer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(rawValue: 0), queue: queue) - #if swift(>=4) - timer.schedule(deadline: .now() + .seconds(interval)) - #else - timer.scheduleOneshot(deadline: .now() + .seconds(interval)) - #endif - timer.setEventHandler(handler: block) - timer.resume() - return timer - } - - self.scheduledSubscription = createTimer(1, queue: subscriptionsQueue, block: { [weak self] in - self?.resetAndStartSubscriptions(subscriptionInfo: subscriptionInfo, identifier: identifier) - }) - } - - func shouldSubscribe(subscriptionInfo: [AWSSubscriptionInfo], identifier: String) -> Bool { + + /// Returns `false` if the given subscription identifier has been cancelled. `true` indicates the identifier + /// is not cancelled and should be resubscribed. + /// + /// This method must be called from `concurrencyQueue`. + /// + /// - Parameter identifier: The subscription identifier + /// - Returns: `true` if the identifier should be resubscribed; `false` otherwise + private func shouldSubscribe(identifier: Int) -> Bool { if cancelledSubscriptions[identifier] != nil { cancelledSubscriptions[identifier] = true return false } return true } - - private func resetAndStartSubscriptions( - subscriptionInfo: [AWSSubscriptionInfo], identifier: String) { - var oldMQTTClients: [AWSIoTMQTTClient] = [] - - // Retain the old clients which we are going to replace with newer ones. - // We retain them so that we can have both old and new active clients active at the same time ensuring no messages are dropped. - // Once the new connections are active, we mute and disconnect the old clients. - for client in mqttClients { - oldMQTTClients.append(client) - } - - mqttClients.removeAll() - mqttClientsWithTopics.removeAll() - - // identify if we still need to establish new connection; if yes, we proceed, else return. - if shouldSubscribe(subscriptionInfo: subscriptionInfo, identifier: identifier) { - - for subscription in subscriptionInfo { - startNewSubscription(subscriptionInfo: subscription) - } - } - - // Mute the old clients by setting the delegate to nil - for client in oldMQTTClients { - client.clientDelegate = nil - } - - // Disconnect the old clients - for client in oldMQTTClients { - client.disconnect() - } - } - - private func startNewSubscription(subscriptionInfo: AWSSubscriptionInfo) { - let interestedTopics = subscriptionInfo.topics.filter({ topicSubscribers[$0] != nil }) + /// Start new subscriptions to the specified topics + /// + /// Internally, this method: + /// - Creates a new, unconnected MQTTClient and assigns `self` as its delegate + /// - Associates the client with the list of topics in `subscriptionInfo` that have a registered watcher + /// - Calls `connect` on the client + /// + /// This method must be called from `concurrencyQueue`. + /// + /// - Parameter subscriptionInfo: The SubscriptionInfo that contains the list of topics, URL and client ID for + /// the subscription + private func startNewConnection(for subscriptionInfo: AWSSubscriptionInfo) { + let interestedTopics = subscriptionInfo.topics.filter { subscribersByTopic[$0] != nil } guard !interestedTopics.isEmpty else { return @@ -130,100 +146,317 @@ class AppSyncMQTTClient: AWSIoTMQTTClientDelegate { let mqttClient = AWSIoTMQTTClient() mqttClient.clientDelegate = self - mqttClients.insert(mqttClient) - mqttClientsWithTopics[mqttClient] = Set(interestedTopics) + topicsByClient[mqttClient] = Set(interestedTopics) - mqttClient.connect(withClientId: subscriptionInfo.clientId, presignedURL: subscriptionInfo.url, statusCallback: nil) + mqttClient.connect(withClientId: subscriptionInfo.clientId, + presignedURL: subscriptionInfo.url, + statusCallback: nil) } - - internal func stopSubscription(subscription: MQTTSubscriptionWatcher, subscriptionId: String) { - self.topicSubscribers.remove(subscription: subscription) - self.cancelledSubscriptions[subscriptionId] = false - self.subscriptionsQueue.async { [weak self] in + /// Cancels the subscriptions currently registered for `watcher`. If cancelling a subscription removes the + /// last registered watcher for a given client connection, this method also disconnects the client. + /// + /// - Parameters: + /// - watcher: The watcher for which to cancel subscriptions + func cancelSubscription(for watcher: MQTTSubscriptionWatcher) { + let watcherId = watcher.getIdentifier() + + AppSyncLog.debug("Stopping watcher \(watcherId)") + + concurrencyQueue.async(flags: .barrier) { [weak self] in guard let self = self else { + AppSyncLog.verbose("### Unexpectedly no self in cancelSubscription") return } - - self.topicSubscribers.cleanUp(topicRemovedHandler: self.unsubscribeTopic) + self.subscribersByTopic.remove { $0.getIdentifier() == watcherId } + self.cancelledSubscriptions[watcherId] = false - for (client, _) in self.mqttClientsWithTopics.filter({ $0.value.isEmpty }) { - client.disconnect() - self.mqttClientsWithTopics.removeValue(forKey: client) - self.mqttClients.remove(client) + let unwatchedTopics = self.subscribersByTopic.removeUnassociatedTopics() + unwatchedTopics.forEach(self.unsubscribeTopic) + + let clientsWithNoTopics = self.removeClientsWithNoTopics() + + // Send the `disconnect` on the subscriptions queue since we don't need it for internal consistency + self.subscriptionsQueue.async { + clientsWithNoTopics.forEach { $0.disconnect() } } } } - - /// Unsubscribe topic + + /// Removes clients with no topics from internal storage, and returns the clients so removed. + /// + /// This method must be called from `concurrencyQueue`. + /// + /// - Returns: An array of clients without topics that have been removed from internal storage + private func removeClientsWithNoTopics() -> [AWSIoTMQTTClient] { + let clientsWithNoTopics = self.topicsByClient + .filter { $0.value.isEmpty } + .map { $0.key } + + for client in clientsWithNoTopics { + self.topicsByClient.removeValue(forKey: client) + } + + return clientsWithNoTopics + } + + /// Invokes `unsubscribeTopic` on the MQTTClient associated with `topic`, and removes the topic from the list + /// of registered topics for that client. + /// + /// This method must be called from `concurrencyQueue`. /// /// - Parameter topic: String private func unsubscribeTopic(topic: String) { - for (client, _) in mqttClientsWithTopics.filter({ $0.value.contains(topic) }) { + for (client, _) in topicsByClient.filter({ $0.value.contains(topic) }) { switch client.mqttStatus { case .connecting, .connected, .connectionError, .connectionRefused, .protocolError: client.unsubscribeTopic(topic) case .disconnected, .unknown: break } - mqttClientsWithTopics[client]?.remove(topic) + topicsByClient[client]?.remove(topic) } } - class TopicSubscribers { - - private var dictionary = [String: NSHashTable]() - - private var lock = NSLock() - - subscript(key: String) -> [MQTTSubscriptionWatcher]? { - return synchronized { - return self.dictionary[key]?.allObjects + // MARK: - AWSIoTMQTTClientDelegate + + /// Notifies subscribers' `messageCallbackDelegate`s of incoming data on the specified topic + /// + /// - Parameters: + /// - data: The data received on the topic + /// - topic: The topic receiving the data + func receivedMessageData(_ data: Data!, onTopic topic: String!) { + concurrencyQueue.sync { + guard let subscribers = subscribersByTopic[topic] else { + return } - } - - func add(watcher: MQTTSubscriptionWatcher, topics: [String]) { - synchronized { - for topic in topics { - if let watchers = self.dictionary[topic] { - watchers.add(watcher) - } else { - let watchers = NSHashTable.weakObjects() - watchers.add(watcher) - self.dictionary[topic] = watchers - } + + subscriptionsQueue.async { + for subscriber in subscribers { + subscriber.messageCallbackDelegate(data: data) } } } - - func remove(subscription: MQTTSubscriptionWatcher) { - synchronized { - dictionary.forEach({ (element) in - element.value.allObjects.filter({ $0.getIdentifier() == subscription.getIdentifier() }).forEach({ (watcher) in - element.value.remove(watcher) - }) - }) + } + + /// Notifies subscribers of a status change on `mqttClient`'s connection + /// + /// - Parameters: + /// - status: The new status of the client + /// - mqttClient: The client affected by the status change + func connectionStatusChanged(_ status: AWSIoTMQTTStatus, client mqttClient: AWSIoTMQTTClient) { + AppSyncLog.debug("\(mqttClient.clientId ?? "(no clientId)"): \(status)") + concurrencyQueue.sync { + notifyStatusCallbackDelegates(for: mqttClient, ofNewStatus: status) + + switch status { + case .connecting: + break + case .connected: + subscribeToTopicsAndNotifyConnectedCallbackDelegates(for: mqttClient) + default: + notifyDisconnectCallbackDelegates(for: mqttClient, ofNewStatus: status) } } - - func cleanUp(topicRemovedHandler: (String) -> Void) { - let unusedTopics: [String] = synchronized { - let unusedTopics = dictionary - .filter({ $0.value.allObjects.isEmpty }) - .map({ $0.key }) - unusedTopics.forEach({ - dictionary.removeValue(forKey: $0) - }) - - return unusedTopics + } + + /// Notifies watchers of a status change in its underlying client. Because of the mapping between subscription + /// watchers, topics and clients, a status change on a single client may alert multiple subscription watchers + /// of the same status event. + /// + /// - Parameters: + /// - Parameter mqttClient: The client that received the status update + /// - status: The new status + private func notifyStatusCallbackDelegates(for mqttClient: AWSIoTMQTTClient, + ofNewStatus status: AWSIoTMQTTStatus) { + guard let topics = topicsByClient[mqttClient], !topics.isEmpty else { + return + } + + let subscribers = subscribersByTopic.elements(for: topics) + + subscriptionsQueue.async { + subscribers.forEach { $0.statusChangeDelegate(status: status) } + } + } + + /// Subscribes to the topics registered as with `mqttClient`, and notifies connected callback delegates that + /// the client is now connected. + /// + /// **NOTE**: A connection notification does not mean that the client is receiving subscription messages, only + /// that the initial connection to the broker is established. Delegates that need to know when a watcher is actually + /// ready to receive messages should create a subscription watcher with a status callback. + /// + /// This method must be called from `concurrencyQueue`. + /// + /// - Parameter mqttClient: The client being subscribed + private func subscribeToTopicsAndNotifyConnectedCallbackDelegates(for mqttClient: AWSIoTMQTTClient) { + guard let topics = topicsByClient[mqttClient], !topics.isEmpty else { + return + } + for topic in topics { + let ackCallback: () -> Void = { [weak self] in + self?.handleSubscriptionAcknowledgement(for: topic) } - unusedTopics.forEach(topicRemovedHandler) + + mqttClient.subscribe(toTopic: topic, qos: 1, extendedCallback: nil, ackCallback: ackCallback) } - - func synchronized(_ body: () throws -> T) rethrows -> T { - lock.lock() - defer { lock.unlock() } - return try body() + + let subscribers = subscribersByTopic.elements(for: topics) + + subscriptionsQueue.async { + subscribers.forEach { $0.connectedCallbackDelegate() } + } + } + + /// When we get a subscription acknowledgement: + /// - Clear the client delegate for any expiring client associated with that topic + /// - Disconnect expiring client + /// - Remove our reference to the expired client in all topics + + private func handleSubscriptionAcknowledgement(for topic: String) { + AppSyncLog.debug("Topic has been subscribed \(topic)") + + clearTopicFromExpiredClientsAndCleanup(topic: topic) + + let subscribers = subscribersByTopic.elements(for: [topic]) + + guard !subscribers.isEmpty else { + return + } + + subscriptionsQueue.async { + subscribers.forEach { $0.subscriptionAcknowledgementDelegate() } } + + } + + /// Removes MQTT clients that have been flagged as expired, and whose topics are handled by another client + /// + /// This method must be called from `concurrencyQueue`. + /// + private func clearTopicFromExpiredClientsAndCleanup(topic: String) { + concurrencyQueue.async(flags: .barrier) { + guard let expiringClients = self.expiringClientsByTopic[topic] else { + return + } + + for client in expiringClients { + client.clientDelegate = nil + client.disconnect() + } + + let expiringClientIds = Set(expiringClients.map { $0.clientId }) + self.expiringClientsByTopic.remove { expiringClientIds.contains($0.clientId) } + self.expiringClientsByTopic.removeUnassociatedTopics() + } + } + + /// Notifies subscribers' `disconnectCallbackDelegate` of a disconnection that was not user-requested + /// + /// This method must be called from `concurrencyQueue`. + /// + /// - Parameter mqttClient: The client that received the disconnect error + private func notifyDisconnectCallbackDelegates(for mqttClient: AWSIoTMQTTClient, ofNewStatus status: AWSIoTMQTTStatus) { + // If the incoming status doesn't represent an error condition, no notification + // is necessary + guard let error = AWSAppSyncSubscriptionError.from(status: status) else { + return + } + + guard let topics = topicsByClient[mqttClient], !topics.isEmpty else { + return + } + + let subscribers = subscribersByTopic.elements(for: topics) + + guard !subscribers.isEmpty else { + return + } + + subscriptionsQueue.async { + subscribers.forEach { $0.disconnectCallbackDelegate(error: error) } + } + } + + /// Reverse-maps the existing mqttClientsWithTopics map to be keyed by topic, with a set of associated + /// clients. + /// + /// - Returns: A map of clients by topic + private func clientsByTopic() -> [String: Set>] { + var clientsByTopic = [String: Set>]() + for (client, topics) in topicsByClient { + for topic in topics { + var oldClients = clientsByTopic[topic] ?? [] + oldClients.insert(client) + clientsByTopic[topic] = oldClients + } + } + return clientsByTopic + } + +} + +/// A structure to maintain a map of weak references to elements, keyed by topic. +/// +/// Note: This class is not thread safe. Callers are responsible for managing concurrency +private class TopicWeakMap { + private var dictionary = [String: NSHashTable]() + + subscript(key: String) -> [T]? { + return dictionary[key]?.allObjects + } + + /// Adds an element to the array of associated objects for each topic in `topics` + /// + /// - Parameters: + /// - element: The element associated with `topics` + /// - topics: The array of topics with which `element` is associated + func add(_ element: T, forTopics topics: [String]) { + for topic in topics { + if let elements = dictionary[topic] { + elements.add(element) + } else { + let elements = NSHashTable.weakObjects() + elements.add(element) + dictionary[topic] = elements + } + } + } + + /// Scans each topic and removes elements matching the predicate + /// + /// - Parameter where: A predicate to test each element + func remove(where: (T) -> Bool) { + for elements in dictionary.values { + let elementsToRemove = elements.allObjects.filter(`where`) + elementsToRemove.forEach { elements.remove($0) } + } + } + + /// Cleans topics with no associated elements from storage, and returns the list of topics + /// + /// - Returns: A list of topics that have no matching elements + @discardableResult func removeUnassociatedTopics() -> [String] { + let unusedTopics = dictionary + .filter { $0.value.allObjects.isEmpty } + .map { $0.key } + + unusedTopics.forEach { + dictionary.removeValue(forKey: $0) + } + + return unusedTopics + } + + /// Returns an array of elements that are associated with at least one member of `topics` + /// + /// - Parameter topics: The topics for which to return associated elements + /// - Returns: An array of elements that are associated with at least one member of `topics` + func elements(for topics: Set) -> [T] { + let elements = topics.map { self[$0] } + .compactMap { $0 } + .flatMap { $0 } + return elements } } diff --git a/AWSAppSyncClient/Internal/AppSyncSubscriptionWithSync.swift b/AWSAppSyncClient/Internal/AppSyncSubscriptionWithSync.swift index c4f5a24e..c7d50a58 100644 --- a/AWSAppSyncClient/Internal/AppSyncSubscriptionWithSync.swift +++ b/AWSAppSyncClient/Internal/AppSyncSubscriptionWithSync.swift @@ -212,18 +212,6 @@ final class AppSyncSubscriptionWithSync = { [weak self] (result, transaction, error) in // `subscribeWithConnectCallback` invokes `resultHandler` with an error if it encounters an error // during the connect phase. Handle that here by updating the success flag and signalling the @@ -237,11 +225,27 @@ final class AppSyncSubscriptionWithSync?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) { - if let error = error as? AWSAppSyncSubscriptionError, error.additionalInfo == "Subscription Terminated." { + if let error = error as? AWSAppSyncSubscriptionError, error.shouldReconnectSubscription { // Do not give the developer a disconnect callback here. We have to retry the subscription once app // comes from background to foreground or internet becomes available. AppSyncLog.debug("Subscription terminated. Waiting for network to restart.") @@ -471,7 +475,7 @@ final class AppSyncSubscriptionWithSync Void ) -> DispatchSourceTimer { - let timer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(rawValue: 0), queue: queue) - #if swift(>=4) - timer.schedule(deadline: deadline) - #else - timer.scheduleOneshot(deadline: deadline) - #endif - timer.setEventHandler(handler: block) - return timer - } - /// This function generates a unique identifier hash for the combination of specified parameters in the /// supplied GraphQL operations. The hash is always same for the same set of operations. /// - Returns: The unique hash for the specified queries & subscription. @@ -585,3 +573,17 @@ final class AppSyncSubscriptionWithSync Void ) -> DispatchSourceTimer { + let deadline = DispatchTime.now() + interval + return makeOneOffDispatchSourceTimer(deadline: deadline, queue: queue, block: block) + } + + /// Convenience function to encapsulate creation of a one-off DispatchSourceTimer for different versions of Swift + /// - Parameters: + /// - deadline: The time to fire the timer + /// - queue: The queue on which the timer should perform its block + /// - block: The block to invoke when the timer is fired + static func makeOneOffDispatchSourceTimer(deadline: DispatchTime, queue: DispatchQueue, block: @escaping () -> Void ) -> DispatchSourceTimer { + let timer = DispatchSource.makeTimerSource(flags: DispatchSource.TimerFlags(rawValue: 0), queue: queue) + #if swift(>=4) + timer.schedule(deadline: deadline) + #else + timer.scheduleOneshot(deadline: deadline) + #endif + timer.setEventHandler(handler: block) + return timer + } + +} diff --git a/AWSAppSyncClient/MQTTSDK/AWSIoTMQTTClient.m b/AWSAppSyncClient/MQTTSDK/AWSIoTMQTTClient.m index 4cdf0fb0..4f65d2ef 100644 --- a/AWSAppSyncClient/MQTTSDK/AWSIoTMQTTClient.m +++ b/AWSAppSyncClient/MQTTSDK/AWSIoTMQTTClient.m @@ -346,6 +346,7 @@ - (BOOL) connectWithCert { [self.streamsThread cancel]; } self.streamsThread = [[NSThread alloc] initWithTarget:self selector:@selector(openStreams:) object:nil]; + self.streamsThread.name = [NSString stringWithFormat:@"AWSIoTMQTTClient streamsThread %@", self.clientId]; [self.streamsThread start]; return YES; } @@ -360,8 +361,9 @@ - (BOOL) connectWithClientId:(NSString *)clientId willRetainFlag:(BOOL)willRetainFlag statusCallback:(void (^)(AWSIoTMQTTStatus status))callback; { - if (self.userDidIssueConnect ) { + if (self.userDidIssueConnect) { //Issuing connect multiple times. Not allowed. + AWSDDLogWarn(@"Connect already in progress, aborting"); return NO; } @@ -405,7 +407,7 @@ - (BOOL)connectWithClientId:(NSString *)clientId } - (BOOL) webSocketConnectWithClientId { - AWSDDLogInfo(@"AWSIoTMQTTClient: connecting via websocket. "); + AWSDDLogInfo(@"AWSIoTMQTTClient(%@): connecting via websocket", self.clientId); if ( self.webSocket ) { [self.webSocket close]; @@ -421,7 +423,7 @@ - (BOOL) webSocketConnectWithClientId { dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ [self initWebSocketConnectionForURL:self.presignedURL]; }); - + } else { //Get Credentials from credentials provider. [[self.configuration.credentialsProvider credentials] continueWithBlock:^id _Nullable(AWSTask * _Nonnull task) { @@ -431,7 +433,7 @@ - (BOOL) webSocketConnectWithClientId { self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:nil]; [self.reconnectThread start]; - AWSDDLogError(@"Unable to connect to MQTT due to an error fetching credentials from the Credentials Provider. Will try again in %f seconds", self.currentReconnectTime); + AWSDDLogError(@"(%@) Unable to connect to MQTT due to an error fetching credentials from the Credentials Provider. Will try again in %f seconds", self.clientId, self.currentReconnectTime); return nil; } @@ -499,12 +501,13 @@ - (void)initWebSocketConnectionForURL:(NSString *)urlString { [self.webSocket open]; // Now that the WebSocket is created and opened, it will send its delegate, i.e., this MQTTclient object the messages. - AWSDDLogVerbose(@"Websocket is created and opened."); + AWSDDLogVerbose(@"(%@) Websocket is created and opened", self.clientId); } - (void)disconnect { if (self.userDidIssueDisconnect ) { //Issuing disconnect multiple times. Turn this function into a noop by returning here. + AWSDDLogWarn(@"(%@) Disconnect already in progress, aborting", self.clientId); return; } @@ -527,11 +530,14 @@ - (void)disconnect { self.decoderReadStream = nil; self.decoderWriteStream = nil; [self.encoderStream close]; + [self.streamsThread cancel]; + self.reconnectTimer = nil; + [self.reconnectThread cancel]; + self.clientDelegate = nil; - - AWSDDLogInfo(@"AWSIoTMQTTClient: Disconnect message issued."); + AWSDDLogInfo(@"AWSIoTMQTTClient(%@): Disconnect message issued", self.clientId); } - (void)reconnectToSession { @@ -548,7 +554,7 @@ - (void)reconnectToSession { return; } - AWSDDLogInfo(@"Attempting to reconnect."); + AWSDDLogInfo(@"(%@) Attempting to reconnect", self.clientId); self.connectionAgeInSeconds = 0; if (self.connectionAgeTimer != nil ) { @@ -612,10 +618,11 @@ - (void)initiateReconnectTimer: (id) sender - (void)openStreams:(id)sender { + AWSDDLogVerbose(@"(%@) Opening streams", self.clientId); //This is invoked in a new thread by the webSocketDidOpen method or by the Connect method. Get the runLoop from the thread. NSRunLoop *runLoopForStreamsThread = [NSRunLoop currentRunLoop]; - //Setup a default timer to ensure that the RunLoop always has atleast one timer on it. This is to prevent the while loop + //Setup a default timer to ensure that the RunLoop always has at least one timer on it. This is to prevent the while loop //below to spin in tight loop when all input sources and session timers are shutdown during a reconnect sequence. NSTimer *defaultRunLoopTimer = [[NSTimer alloc] initWithFireDate:[NSDate dateWithTimeIntervalSinceNow:60.0] interval:60.0 @@ -633,7 +640,7 @@ - (void)openStreams:(id)sender [self.session connectToInputStream:self.decoderStream outputStream:self.encoderStream]; while (self.runLoopShouldContinue && NSThread.currentThread.isCancelled == NO) { - //This will continue run until runLoopShouldContinue is set to NO during "disconnect" or + //This will continue to run until runLoopShouldContinue is set to NO during "disconnect" or //"websocketDidFail" //Run one cycle of the runloop. This will return after a input source event or timer event is processed @@ -644,6 +651,7 @@ - (void)openStreams:(id)sender [defaultRunLoopTimer invalidate]; if (!self.runLoopShouldContinue ) { + AWSDDLogVerbose(@"(%@) Cleaning up runloop & streams", self.clientId); if (@available(iOS 10.0, *)) { [runLoopForStreamsThread performBlock:^{ if (self.connectionAgeTimer != nil ) { @@ -674,7 +682,7 @@ - (void)invalidateTimer { } - (void)timerHandler:(NSTimer*)theTimer { - AWSDDLogVerbose(@"ThreadID: [%@] Default run loop timer executed: RunLoopShouldContinue is [%d] and Cancelled is [%d]", [NSThread currentThread], self.runLoopShouldContinue, [[NSThread currentThread] isCancelled]); + AWSDDLogVerbose(@"ThreadID: [%@] Default run loop timer executed: runLoopShouldContinue is [%d] and Cancelled is [%d]", [NSThread currentThread], self.runLoopShouldContinue, [[NSThread currentThread] isCancelled]); } #pragma mark publish methods @@ -746,7 +754,7 @@ - (void)publishData:(NSData*)data format:@"Cannot specify `ackCallback` block for QoS = 0."]; } - AWSDDLogVerbose(@"isReadyToPublish: %i",[self.session isReadyToPublish]); + AWSDDLogVerbose(@"isReadyToPublish: %i", [self.session isReadyToPublish]); if (qos == 0) { [self.session publishData:data onTopic:topic]; } @@ -761,7 +769,9 @@ - (void)publishData:(NSData*)data #pragma mark subscribe methods -- (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos messageCallback:(AWSIoTMQTTNewMessageBlock)callback { +- (void)subscribeToTopic:(NSString*)topic + qos:(UInt8)qos + messageCallback:(AWSIoTMQTTNewMessageBlock)callback { [self subscribeToTopic:topic qos:qos messageCallback:callback @@ -769,7 +779,8 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos messageCallback:(AWSIoT } -- (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos +- (void)subscribeToTopic:(NSString*)topic + qos:(UInt8)qos messageCallback:(AWSIoTMQTTNewMessageBlock)callback ackCallback:(AWSIoTMQTTAckBlock)ackCallBack { if (!_userDidIssueConnect) { @@ -781,7 +792,7 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos [NSException raise:NSInternalInconsistencyException format:@"Cannot call subscribe after disconnecting from the server"]; } - AWSDDLogInfo(@"Subscribing to topic %@ with messageCallback", topic); + AWSDDLogInfo(@"(%@) Subscribing to topic %@ with messageCallback", self.clientId, topic); AWSIoTMQTTTopicModel *topicModel = [AWSIoTMQTTTopicModel new]; topicModel.topic = topic; topicModel.qos = qos; @@ -789,10 +800,10 @@ - (void)subscribeToTopic:(NSString*)topic qos:(UInt8)qos [self.topicListeners setObject:topicModel forKey:topic]; UInt16 messageId = [self.session subscribeToTopic:topicModel.topic atLevel:topicModel.qos]; - AWSDDLogVerbose(@"Now subscribing w/ messageId: %d", messageId); + AWSDDLogVerbose(@"(%@) Now subscribing w/ messageId: %d, qos: %u", self.clientId, messageId, qos); if (ackCallBack) { [self.ackCallbackDictionary setObject:ackCallBack - forKey:[NSNumber numberWithInt:messageId]]; + forKey:[NSNumber numberWithInt:messageId]]; } } @@ -819,7 +830,7 @@ - (void)subscribeToTopic:(NSString*)topic format:@"Cannot call subscribe after disconnecting from the server"]; } - AWSDDLogInfo(@"Subscribing to topic %@ with ExtendedmessageCallback", topic); + AWSDDLogInfo(@"(%@) Subscribing to topic %@ with ExtendedmessageCallback", self.clientId, topic); AWSIoTMQTTTopicModel *topicModel = [AWSIoTMQTTTopicModel new]; topicModel.topic = topic; topicModel.qos = qos; @@ -827,10 +838,10 @@ - (void)subscribeToTopic:(NSString*)topic topicModel.extendedCallback = callback; [self.topicListeners setObject:topicModel forKey:topic]; UInt16 messageId = [self.session subscribeToTopic:topicModel.topic atLevel:topicModel.qos]; - AWSDDLogVerbose(@"Now subscribing w/ messageId: %d", messageId); + AWSDDLogVerbose(@"(%@) Now subscribing w/ messageId: %d, qos: %u", self.clientId, messageId, qos); if (ackCallback) { [self.ackCallbackDictionary setObject:ackCallback - forKey:[NSNumber numberWithInt:messageId]]; + forKey:[NSNumber numberWithInt:messageId]]; } } @@ -845,7 +856,7 @@ - (void)unsubscribeTopic:(NSString*)topic [NSException raise:NSInternalInconsistencyException format:@"Cannot call unsubscribe after disconnecting from the server"]; } - AWSDDLogInfo(@"Unsubscribing from topic %@", topic); + AWSDDLogInfo(@"(%@) Unsubscribing from topic %@", self.clientId, topic); UInt16 messageId = [self.session unsubscribeTopic:topic]; [self.topicListeners removeObjectForKey:topic]; if (ackCallback) { @@ -862,31 +873,35 @@ - (void)unsubscribeTopic:(NSString*)topic { - (void)connectionAgeTimerHandler:(NSTimer*)theTimer { self.connectionAgeInSeconds++; - AWSDDLogVerbose(@"Connection Age: %ld", (long)self.connectionAgeInSeconds); + AWSDDLogVerbose(@"(%@) Connection age: %ld", self.clientId, (long)self.connectionAgeInSeconds); if (self.connectionAgeInSeconds >= self.minimumConnectionTime) { - AWSDDLogVerbose(@"Connection Age threshold reached. Resetting reconnect time to [%fs]", self.baseReconnectTime); + AWSDDLogVerbose(@"(%@) Connection age threshold reached. Resetting reconnect time to [%fs]", self.clientId, self.baseReconnectTime); self.currentReconnectTime = self.baseReconnectTime; [theTimer invalidate]; } } - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventCode { - AWSDDLogVerbose(@"MQTTSessionDelegate handleEvent: %i",eventCode); + AWSDDLogVerbose(@"(%@) MQTTSessionDelegate handleEvent: %i", self.clientId, eventCode); switch (eventCode) { case AWSMQTTSessionEventConnected: - AWSDDLogInfo(@"MQTT session connected."); + AWSDDLogInfo(@"(%@) MQTT session connected", self.clientId); self.mqttStatus = AWSIoTMQTTStatusConnected; [self notifyConnectionStatus]; if (self.connectionAgeTimer != nil) { [self.connectionAgeTimer invalidate]; } - self.connectionAgeTimer = [ NSTimer scheduledTimerWithTimeInterval:1.0 target:self selector:@selector(connectionAgeTimerHandler:) userInfo:nil repeats:YES]; + self.connectionAgeTimer = [NSTimer scheduledTimerWithTimeInterval:1.0 + target:self + selector:@selector(connectionAgeTimerHandler:) + userInfo:nil + repeats:YES]; //Subscribe to prior topics if (_autoResubscribe) { - AWSDDLogInfo(@"Auto-resubscribe is enabled. Resubscribing to topics."); + AWSDDLogInfo(@"(%@) Auto-resubscribe is enabled. Resubscribing to topics.", self.clientId); for (AWSIoTMQTTTopicModel *topic in self.topicListeners.allValues) { [self.session subscribeToTopic:topic.topic atLevel:topic.qos]; } @@ -894,12 +909,13 @@ - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventC break; case AWSMQTTSessionEventConnectionRefused: - AWSDDLogWarn(@"MQTT session refused."); + AWSDDLogWarn(@"(%@) MQTT session refused", self.clientId); self.mqttStatus = AWSIoTMQTTStatusConnectionRefused; [self notifyConnectionStatus]; break; + case AWSMQTTSessionEventConnectionClosed: - AWSDDLogInfo(@"MQTTSessionEventConnectionClosed: MQTT session closed."); + AWSDDLogInfo(@"(%@) MQTTSessionEventConnectionClosed: MQTT session closed", self.clientId); self.connectionAgeInSeconds = 0; if (self.connectionAgeTimer != nil ) { @@ -922,12 +938,15 @@ - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventC [self notifyConnectionStatus]; //Retry - self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:nil]; + self.reconnectThread = [[NSThread alloc] initWithTarget:self + selector:@selector(initiateReconnectTimer:) + object:nil]; + self.reconnectThread.name = [NSString stringWithFormat:@"AWSIoTMQTTClient reconnect %@", self.clientId]; [self.reconnectThread start]; } break; case AWSMQTTSessionEventConnectionError: - AWSDDLogError(@"MQTTSessionEventConnectionError: Received an MQTT session connection error"); + AWSDDLogError(@"(%@) MQTTSessionEventConnectionError: Received an MQTT session connection error", self.clientId); self.connectionAgeInSeconds = 0; if (self.connectionAgeTimer != nil ) { @@ -949,14 +968,15 @@ - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventC //Retry self.reconnectThread = [[NSThread alloc] initWithTarget:self selector:@selector(initiateReconnectTimer:) object:nil]; + self.reconnectThread.name = [NSString stringWithFormat:@"AWSIoTMQTTClient reconnect %@", self.clientId]; [self.reconnectThread start]; } break; case AWSMQTTSessionEventProtocolError: - AWSDDLogError(@"MQTT session protocol error"); + AWSDDLogError(@"(%@) MQTT session protocol error", self.clientId); self.mqttStatus = AWSIoTMQTTStatusProtocolError; [self notifyConnectionStatus]; - AWSDDLogError(@"Disconnecting."); + AWSDDLogError(@"(%@) Disconnecting", self.clientId); [self disconnect]; break; default: @@ -966,8 +986,9 @@ - (void)session:(AWSMQTTSession*)session handleEvent:(AWSMQTTSessionEvent)eventC } #pragma mark subscription distributor + - (void)session:(AWSMQTTSession*)session newMessage:(NSData*)data onTopic:(NSString*)topic { - AWSDDLogVerbose(@"MQTTSessionDelegate newMessage: %@ onTopic: %@",[[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding], topic); + AWSDDLogVerbose(@"(%@) MQTTSessionDelegate newMessage: %@ onTopic: %@", self.clientId, [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding], topic); NSArray *topicParts = [topic componentsSeparatedByString: @"/"]; @@ -993,24 +1014,24 @@ - (void)session:(AWSMQTTSession*)session newMessage:(NSData*)data onTopic:(NSStr } if (topicMatch) { - AWSDDLogVerbose(@"<<%@>>Topic: %@ is matched.",[NSThread currentThread], topic); + AWSDDLogVerbose(@"(%@) <<%@>>Topic: %@ is matched.", self.clientId, [NSThread currentThread], topic); AWSIoTMQTTTopicModel *topicModel = [self.topicListeners objectForKey:topicKey]; if (topicModel) { if (topicModel.callback != nil) { - AWSDDLogVerbose(@"<<%@>>topicModel.callback.", [NSThread currentThread]); + AWSDDLogVerbose(@"(%@) <<%@>>topicModel.callback.", self.clientId, [NSThread currentThread]); dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ topicModel.callback(data); }); } if (topicModel.extendedCallback != nil) { - AWSDDLogVerbose(@"<<%@>>topicModel.extendedcallback.", [NSThread currentThread]); + AWSDDLogVerbose(@"(%@) <<%@>>topicModel.extendedcallback.", self.clientId, [NSThread currentThread]); dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ topicModel.extendedCallback(self, topic, data); }); } if (self.clientDelegate != nil ) { - AWSDDLogVerbose(@"<<%@>>Calling receviedMessageData on client Delegate.", [NSThread currentThread]); + AWSDDLogVerbose(@"(%@) <<%@>>Calling receivedMessageData on client Delegate.", self.clientId, [NSThread currentThread]); dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ [self.clientDelegate receivedMessageData:data onTopic:topic]; }); @@ -1022,14 +1043,14 @@ - (void)session:(AWSMQTTSession*)session newMessage:(NSData*)data onTopic:(NSStr } #pragma mark callback handler -- (void)session:(AWSMQTTSession*)session -newAckForMessageId:(UInt16)msgId { - AWSDDLogVerbose(@"MQTTSessionDelegate new ack for msgId: %d", msgId); + +- (void)session:(AWSMQTTSession*)session newAckForMessageId:(UInt16)msgId { + AWSDDLogVerbose(@"(%@) MQTTSessionDelegate new ack for msgId: %d", self.clientId, msgId); AWSIoTMQTTAckBlock callback = [[self ackCallbackDictionary] objectForKey:[NSNumber numberWithInt:msgId]]; - if(callback) { + if (callback) { // Give callback to the client on a background thread - dispatch_async(dispatch_get_global_queue( DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ + dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^(void){ callback(); }); [[self ackCallbackDictionary] removeObjectForKey:[NSNumber numberWithInt:msgId]]; @@ -1040,7 +1061,7 @@ - (void)session:(AWSMQTTSession*)session - (void)webSocketDidOpen:(AWSSRWebSocket *)webSocket; { - AWSDDLogInfo(@"Websocket did open and is connected."); + AWSDDLogInfo(@"(%@) Websocket did open and is connected", self.clientId); // The WebSocket is connected; at this point we need to create streams // for MQTT encode/decode and then instantiate the MQTT client. @@ -1065,18 +1086,19 @@ - (void)webSocketDidOpen:(AWSSRWebSocket *)webSocket; //Create Thread and start with "openStreams" being the entry point. if (self.streamsThread) { - AWSDDLogVerbose(@"Issued Cancel on thread [%@]", self.streamsThread); + AWSDDLogVerbose(@"(%@) Issued Cancel on thread [%@]", self.clientId, self.streamsThread); [self.streamsThread cancel]; } self.streamsThread = [[NSThread alloc] initWithTarget:self selector:@selector(openStreams:) object:nil]; + self.streamsThread.name = [NSString stringWithFormat:@"AWSIoTMQTTClient streamsThread %@", self.clientId]; [self.streamsThread start]; } - (void)webSocket:(AWSSRWebSocket *)webSocket didFailWithError:(NSError *)error; { - AWSDDLogError(@"didFailWithError: Websocket failed With Error %@", error); + AWSDDLogError(@"(%@) WebsocketdidFailWithError:%@", self.clientId, error); // The WebSocket has failed.The input/output streams can be closed here. // Also, the webSocket can be set to nil @@ -1100,20 +1122,20 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didReceiveMessage:(id)message; if ([message isKindOfClass:[NSData class]]) { NSData *messageData = (NSData *)message; - AWSDDLogVerbose(@"Websocket didReceiveMessage: Received %lu bytes", (unsigned long)messageData.length); + AWSDDLogVerbose(@"(%@) Websocket didReceiveMessage: Received %lu bytes", self.clientId, (unsigned long)messageData.length); // When a message is received, write it to the Decoder's input stream. [self.toDecoderStream write:[messageData bytes] maxLength:messageData.length]; } else { - AWSDDLogError(@"Expected NSData object, but got a %@ object instead.", NSStringFromClass([message class])); + AWSDDLogError(@"(%@) Websocket expected NSData object, but got a %@ object instead.", self.clientId, NSStringFromClass([message class])); } } - (void)webSocket:(AWSSRWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean; { - AWSDDLogInfo(@"WebSocket closed with code:%ld with reason:%@", (long)code, reason); + AWSDDLogInfo(@"(%@) WebSocket closed with code:%ld with reason:%@", self.clientId, (long)code, reason); // The WebSocket has closed. The input/output streams can be closed here. // Also, the webSocket can be set to nil @@ -1134,7 +1156,7 @@ - (void)webSocket:(AWSSRWebSocket *)webSocket didCloseWithCode:(NSInteger)code r - (void)webSocket:(AWSSRWebSocket *)webSocket didReceivePong:(NSData *)pongPayload; { - AWSDDLogVerbose(@"Websocket received pong"); + AWSDDLogVerbose(@"(%@) Websocket received pong", self.clientId); } @end diff --git a/AWSAppSyncIntegrationTests/AWSAppSyncAPIKeyAuthTests.swift b/AWSAppSyncIntegrationTests/AWSAppSyncAPIKeyAuthTests.swift index 199d1d7c..a9603fbf 100644 --- a/AWSAppSyncIntegrationTests/AWSAppSyncAPIKeyAuthTests.swift +++ b/AWSAppSyncIntegrationTests/AWSAppSyncAPIKeyAuthTests.swift @@ -170,94 +170,6 @@ class AWSAppSyncAPIKeyAuthTests: XCTestCase { wait(for: [emptyCacheCompleted], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) } - func testSubscription_Stress() { - guard let appSyncClient = appSyncClient else { - XCTFail("appSyncClient should not be nil") - return - } - let subscriptionStressTestHelper = SubscriptionStressTestHelper() - subscriptionStressTestHelper.stressTestSubscriptions(with: appSyncClient) - } - - func testSubscription() throws { - let postCreated = expectation(description: "Post created successfully.") - let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation - - var idHolder: GraphQLID? - appSyncClient?.perform(mutation: addPost, queue: AWSAppSyncAPIKeyAuthTests.mutationQueue) { result, error in - print("CreatePost result handler invoked") - XCTAssertNil(error) - XCTAssertNotNil(result?.data?.createPostWithoutFileUsingParameters?.id) - XCTAssertEqual(result!.data!.createPostWithoutFileUsingParameters?.author, DefaultTestPostData.author) - idHolder = result?.data?.createPostWithoutFileUsingParameters?.id - postCreated.fulfill() - } - wait(for: [postCreated], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - - guard let id = idHolder else { - XCTFail("Expected ID from addPost mutation") - return - } - - // This handler will be invoked if an error occurs during the setup, or if we receive a successful mutation response. - let subscriptionResultHandlerInvoked = expectation(description: "Subscription received successfully.") - - let subscription = try self.appSyncClient?.subscribe(subscription: OnUpvotePostSubscription(id: id), - queue: AWSAppSyncAPIKeyAuthTests.subscriptionAndFetchQueue) { result, _, error in - print("Subscription result handler invoked") - guard error == nil else { - XCTAssertNil(error) - return - } - - guard result != nil else { - XCTFail("Result was unexpectedly nil") - return - } - subscriptionResultHandlerInvoked.fulfill() - } - XCTAssertNotNil(subscription, "Subscription expected to be non nil.") - - defer { - subscription?.cancel() - } - - // Currently, subscriptions don't have a good way to inspect that they have been registered on the service. - // We'll check for `getTopics` returning a non-empty value to stand in for a completion handler - let subscriptionIsRegisteredExpectation = expectation(description: "Upvote subscription should have a non-empty topics list") - let subscriptionGetTopicsTimer = Timer.scheduledTimer(withTimeInterval: 0.25, repeats: true) { - _ in - guard let subscription = subscription else { - return - } - - let topics = subscription.getTopics() - - guard !topics.isEmpty else { - return - } - - subscriptionIsRegisteredExpectation.fulfill() - } - wait(for: [subscriptionIsRegisteredExpectation], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - subscriptionGetTopicsTimer.invalidate() - - print("Sleeping a few seconds to wait for server to begin delivering subscriptions") - sleep(5) - - let upvotePerformed = expectation(description: "Upvote mutation performed") - let upvoteMutation = UpvotePostMutation(id: id) - self.appSyncClient?.perform(mutation: upvoteMutation, queue: AWSAppSyncAPIKeyAuthTests.mutationQueue) { - result, error in - print("Received upvote mutation response.") - XCTAssertNil(error) - XCTAssertNotNil(result?.data?.upvotePost?.id) - upvotePerformed.fulfill() - } - - wait(for: [upvotePerformed, subscriptionResultHandlerInvoked], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - } - func testOptimisticWriteWithQueryParameter() { let postCreated = expectation(description: "Post created successfully.") let successfulMutationEvent2Expectation = expectation(description: "Mutation done successfully.") @@ -334,79 +246,6 @@ class AWSAppSyncAPIKeyAuthTests: XCTestCase { wait(for: [successfulLocalQueryFetchExpectation], timeout: 5.0) } - func testSubscriptionIsInvokedOnProvidedQueue() throws { - let label = "testSyncOperationAtSetupAndReconnect.syncWatcherCallbackQueue" - let syncWatcherCallbackQueue = DispatchQueue(label: label) - let queueIdentityKey = DispatchSpecificKey() - let queueIdentityValue = label - syncWatcherCallbackQueue.setSpecific(key: queueIdentityKey, value: queueIdentityValue) - - let appSyncClient = try AWSAppSyncAPIKeyAuthTests.makeAppSyncClient(authType: self.authType) - - let postCreated = expectation(description: "Post created successfully.") - let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation - var idHolder: GraphQLID? - appSyncClient.perform(mutation: addPost, queue: AWSAppSyncAPIKeyAuthTests.mutationQueue) { result, error in - print("CreatePost result handler invoked") - idHolder = result?.data?.createPostWithoutFileUsingParameters?.id - postCreated.fulfill() - } - wait(for: [postCreated], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - - guard let id = idHolder else { - XCTFail("Expected ID from addPost mutation") - return - } - - let baseQueryResultHandler: (GraphQLResult?, Error?) -> Void = { _, _ in } - let deltaQueryResultHandler: (GraphQLResult?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in } - - let subscriptionResultHandlerInvoked = expectation(description: "Subscription result handler invoked") - let subscriptionResultHandler: (GraphQLResult?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in - subscriptionResultHandlerInvoked.fulfill() - let dispatchQueueValue = DispatchQueue.getSpecific(key: queueIdentityKey) - XCTAssertEqual(dispatchQueueValue, queueIdentityValue, "Expected callback to be invoked on provided queue") - } - - let listPostsQuery = ListPostsQuery() - let subscription = OnUpvotePostSubscription(id: id) - - let syncWatcher = appSyncClient.sync( - baseQuery: listPostsQuery, - baseQueryResultHandler: baseQueryResultHandler, - subscription: subscription, - subscriptionResultHandler: subscriptionResultHandler, - deltaQuery: listPostsQuery, - deltaQueryResultHandler: deltaQueryResultHandler, - callbackQueue: syncWatcherCallbackQueue, - syncConfiguration: SyncConfiguration() - ) - - defer { - syncWatcher.cancel() - } - - let upvote = UpvotePostMutation(id: id) - let upvoteComplete = expectation(description: "Upvote mutation completed") - - // Wait 3 seconds to ensure sync/subscription is active, then trigger the mutation - DispatchQueue.global().async { - sleep(3) - self.appSyncClient?.perform(mutation: upvote, - queue: AWSAppSyncAPIKeyAuthTests.mutationQueue) { _, _ in - upvoteComplete.fulfill() - } - } - - wait( - for: [ - upvoteComplete, - subscriptionResultHandlerInvoked, - ], - timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout - ) - } - // Validates that queries are invoked and returned as expected during initial setup and // reconnection flows func testSyncOperationAtSetupAndReconnect() throws { @@ -770,16 +609,6 @@ class AWSAppSyncAPIKeyAuthTests: XCTestCase { } - func testSubscriptionResultHandlerCanOperateOnEmptyCacheWithBackingDatabase() throws { - let rootDirectory = FileManager.default.temporaryDirectory.appendingPathComponent("emptyCacheTest-\(UUID().uuidString)") - let cacheConfiguration = try AWSAppSyncCacheConfiguration(withRootDirectory: rootDirectory) - try doSubscriptionResultHandlerTesting(withCacheConfiguration: cacheConfiguration) - } - - func testSubscriptionResultHandlerCanOperateOnEmptyCacheWithoutBackingDatabase() throws { - try doSubscriptionResultHandlerTesting(withCacheConfiguration: nil) - } - // MARK: - Utilities static func makeAppSyncClient(authType: AppSyncClientTestHelper.AuthenticationType, @@ -794,102 +623,6 @@ class AWSAppSyncAPIKeyAuthTests: XCTestCase { return helper.appSyncClient } - func doSubscriptionResultHandlerTesting(withCacheConfiguration cacheConfiguration: AWSAppSyncCacheConfiguration?) throws { - let appSyncClient = try AWSAppSyncAPIKeyAuthTests.makeAppSyncClient(authType: self.authType, cacheConfiguration: cacheConfiguration) - - // OnDeltaPostSubscription requires no knowledge of prior state, so we can use it to test operations on an - // empty cache - let subscriptionWatcherTriggered = expectation(description: "Subscription watcher was triggered") - // We don't care if this gets triggered multiple times - subscriptionWatcherTriggered.assertForOverFulfill = false - let subscription = try appSyncClient.subscribe(subscription: OnDeltaPostSubscription()) { result, transaction, error in - defer { - subscriptionWatcherTriggered.fulfill() - } - - guard let transaction = transaction else { - XCTFail("Transaction unexpectedly nil in subscription watcher") - return - } - - guard error == nil else { - XCTFail("Unexpected error in subscription watcher: \(error!.localizedDescription)") - return - } - - guard - let result = result, - let onDeltaPostGraphQLResult = result.data?.onDeltaPost - else { - XCTFail("Result onDeltaPost unexpectedly empty in subscription watcher") - return - } - - let newPost = ListPostsQuery.Data.ListPost(id: onDeltaPostGraphQLResult.id, - author: onDeltaPostGraphQLResult.author, - title: onDeltaPostGraphQLResult.title, - content: onDeltaPostGraphQLResult.content, - url: onDeltaPostGraphQLResult.url, - ups: onDeltaPostGraphQLResult.ups, - downs: onDeltaPostGraphQLResult.downs, - file: nil, - createdDate: onDeltaPostGraphQLResult.createdDate, - awsDs: onDeltaPostGraphQLResult.awsDs) - - do { - try transaction.update(query: ListPostsQuery()) { (data: inout ListPostsQuery.Data) in - XCTAssertNil(data.listPosts) - data.listPosts = [newPost] - } - } catch { - XCTFail("Unexpected error updating local cache in subscription watcher: \(error.localizedDescription)") - return - } - } - - defer { - subscription?.cancel() - } - - // Currently, subscriptions don't have a good way to inspect that they have been registered on the service. - // We'll check for `getTopics` returning a non-empty value to stand in for a completion handler - let subscriptionIsRegisteredExpectation = expectation(description: "Subscription should have a non-empty topics list") - let subscriptionGetTopicsTimer = Timer.scheduledTimer(withTimeInterval: 0.25, repeats: true) { - _ in - guard let subscription = subscription else { - return - } - - let topics = subscription.getTopics() - - guard !topics.isEmpty else { - return - } - - subscriptionIsRegisteredExpectation.fulfill() - } - wait(for: [subscriptionIsRegisteredExpectation], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - subscriptionGetTopicsTimer.invalidate() - - print("Sleeping a few seconds to wait for server to begin delivering subscriptions") - sleep(5) - - let newPost = CreatePostWithoutFileUsingParametersMutation(author: "Test author", - title: "Test Title", - content: "Test content", - url: "http://www.amazon.com/", - ups: 0, - downs: 0) - - let newPostCreated = expectation(description: "New post created") - self.appSyncClient?.perform(mutation: newPost, - queue: AWSAppSyncAPIKeyAuthTests.mutationQueue) { _, _ in - newPostCreated.fulfill() - } - - wait(for: [newPostCreated, subscriptionWatcherTriggered], timeout: AWSAppSyncAPIKeyAuthTests.networkOperationTimeout) - } - } private enum SyncWatcherLifecyclePhase { diff --git a/AWSAppSyncIntegrationTests/AppSyncMQTTClientTests.swift b/AWSAppSyncIntegrationTests/AppSyncMQTTClientTests.swift index 40cfc4b2..7ce26fbf 100644 --- a/AWSAppSyncIntegrationTests/AppSyncMQTTClientTests.swift +++ b/AWSAppSyncIntegrationTests/AppSyncMQTTClientTests.swift @@ -36,8 +36,8 @@ class AppSyncMQTTClientTests: XCTestCase { let client = AppSyncMQTTClient() let watcher = MockSubscriptionWatcher(topics: ["1", "2"]) - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: 1) wait(for: [expectation], timeout: 2) } @@ -59,14 +59,14 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher0 = MockSubscriptionWatcher(topics: ["1", "2"]) - client.addWatcher(watcher: watcher0, topics: watcher0.getTopics(), identifier: watcher0.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher0.getTopics())], identifier: "1") + client.add(watcher: watcher0, forNewTopics: watcher0.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher0.getTopics())], identifier: 1) let watcher1 = MockSubscriptionWatcher(topics: ["2", "3"]) let allTopics = [watcher0.getTopics(), watcher1.getTopics()].flatMap({ $0 }) - client.addWatcher(watcher: watcher1, topics: watcher1.getTopics(), identifier: watcher1.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: allTopics)], identifier: "1") + client.add(watcher: watcher1, forNewTopics: watcher1.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: allTopics)], identifier: 1) wait(for: [expectation], timeout: 2) } @@ -88,8 +88,8 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher = MockSubscriptionWatcher(topics: ["1", "2"]) let unwantedTopics = ["3"] - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: unwantedTopics)], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: unwantedTopics)], identifier: 1) wait(for: [expectation], timeout: 2) } @@ -110,14 +110,14 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher = MockSubscriptionWatcher(topics: ["1", "2"]) let topics = ["2", "3"] - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: topics)], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: topics)], identifier: 1) wait(for: [expectation], timeout: 2) } func testStopSubscriptionsOnWatcherDealloc() { - + let connectExpectation = XCTestExpectation(description: "AWSIoTMQTTClient should connect") let disconnectExpectation = XCTestExpectation(description: "AWSIoTMQTTClient should disconnect") @@ -139,12 +139,12 @@ class AppSyncMQTTClientTests: XCTestCase { autoreleasepool { let deallocBlock: (MQTTSubscriptionWatcher) -> Void = { (object) in - client.stopSubscription(subscription: object, subscriptionId: "1") + client.cancelSubscription(for: object) } let watcher = MockSubscriptionWatcher(topics: ["1", "2"], deallocBlock: deallocBlock) - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.topics)], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.topics)], identifier: 1) weakWatcher = watcher wait(for: [connectExpectation], timeout: 2) @@ -156,7 +156,6 @@ class AppSyncMQTTClientTests: XCTestCase { } func testSubscribeTopicsAfterConnected() { - let connectExpectation = XCTestExpectation(description: "AWSIoTMQTTClient should connect") let subscriptionExpectation = XCTestExpectation(description: "AWSIoTMQTTClient should subscribe to topic once connected") @@ -164,7 +163,7 @@ class AppSyncMQTTClientTests: XCTestCase { var triggerConnectionStatusChangedToConnected: (() -> Void)? - let connect: @convention(block) (AWSIoTMQTTClient, NSString, NSString, Any?) -> Bool = { (instance, client, url, wat) -> Bool in + let connect: @convention(block) (AWSIoTMQTTClient, NSString, NSString, Any?) -> Bool = { (instance, _, _, _) -> Bool in connectExpectation.fulfill() triggerConnectionStatusChangedToConnected = { instance.clientDelegate.connectionStatusChanged(.connected, client: instance) @@ -174,21 +173,21 @@ class AppSyncMQTTClientTests: XCTestCase { var subscribedTopics = [String]() - let subscribe: @convention(block) (AWSIoTMQTTClient, NSString, UInt8, Any?) -> Void = { (_, topic, _, _) in + let subscribe: @convention(block) (AWSIoTMQTTClient, NSString, UInt8, Any?, Any?) -> Void = { (_, topic, _, _, _) in subscribedTopics.append(topic as String) subscriptionExpectation.fulfill() } AWSIoTMQTTClient.swizzle(selector: #selector(AWSIoTMQTTClient.connect(withClientId:presignedURL:statusCallback:)), withBlock: connect) - AWSIoTMQTTClient.swizzle(selector: #selector(AWSIoTMQTTClient.subscribe(toTopic:qos:extendedCallback:)), withBlock: subscribe) + AWSIoTMQTTClient.swizzle(selector: #selector(AWSIoTMQTTClient.subscribe(toTopic:qos:extendedCallback:ackCallback:)), withBlock: subscribe) let client = AppSyncMQTTClient() let watcher = MockSubscriptionWatcher(topics: ["1", "2", "3"]) let topics = ["2", "3"] - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: topics)], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: topics)], identifier: 1) wait(for: [connectExpectation], timeout: 5) @@ -225,8 +224,8 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher = MockSubscriptionWatcher(topics: ["1"], disconnectCallbackBlock: disconnectCallbackBlock) - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: 1) wait(for: [connectExpectation], timeout: 2) @@ -262,8 +261,8 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher = MockSubscriptionWatcher(topics: ["1"], messageCallbackBlock: messageCallbackBlock) - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.getTopics())], identifier: 1) wait(for: [connectExpectation], timeout: 2) @@ -295,13 +294,14 @@ class AppSyncMQTTClientTests: XCTestCase { let watcher = MockSubscriptionWatcher(topics: ["1", "2"]) - client.addWatcher(watcher: watcher, topics: watcher.getTopics(), identifier: watcher.getIdentifier()) - client.startSubscriptions(subscriptionInfo: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.topics)], identifier: "1") + client.add(watcher: watcher, forNewTopics: watcher.getTopics()) + client.startSubscriptions(subscriptionInfos: [AWSSubscriptionInfo(clientId: "1", url: "url", topics: watcher.topics)], identifier: 1) wait(for: [expectation], timeout: 2) - client.stopSubscription(subscription: watcher, subscriptionId: "1") + client.cancelSubscription(for: watcher) wait(for: [disconnectExpectation], timeout: 2) } + } diff --git a/AWSAppSyncIntegrationTests/Helpers/MockSubscriptionWatcher.swift b/AWSAppSyncIntegrationTests/Helpers/MockSubscriptionWatcher.swift index 0a8e8adc..a3f867ac 100644 --- a/AWSAppSyncIntegrationTests/Helpers/MockSubscriptionWatcher.swift +++ b/AWSAppSyncIntegrationTests/Helpers/MockSubscriptionWatcher.swift @@ -7,25 +7,30 @@ import Foundation @testable import AWSAppSync -class MockSubscriptionWatcher: MQTTSubscriptionWatcher { - func connectedCallbackDelegate() { - } +class MockSubscriptionWatcher: MQTTSubscriptionWatcher, CustomStringConvertible { let identifier: Int let topics: [String] let messageCallbackBlock: ((Data) -> Void)? let disconnectCallbackBlock: ((Error) -> Void)? let deallocBlock: ((MQTTSubscriptionWatcher) -> Void)? + let statusChangeCallbackBlock: ((AWSIoTMQTTStatus) -> Void)? + let subscriptionAcknowledgementCallbackBlock: (() -> Void)? init(topics: [String], deallocBlock:((MQTTSubscriptionWatcher) -> Void)? = nil, messageCallbackBlock:((Data) -> Void)? = nil, - disconnectCallbackBlock:((Error) -> Void)? = nil) { + disconnectCallbackBlock:((Error) -> Void)? = nil, + statusChangeCallbackBlock: ((AWSIoTMQTTStatus) -> Void)? = nil, + subscriptionAcknowledgementCallbackBlock: (() -> Void)? = nil + ) { self.identifier = NSUUID().hash self.topics = topics self.deallocBlock = deallocBlock self.messageCallbackBlock = messageCallbackBlock self.disconnectCallbackBlock = disconnectCallbackBlock + self.statusChangeCallbackBlock = statusChangeCallbackBlock + self.subscriptionAcknowledgementCallbackBlock = subscriptionAcknowledgementCallbackBlock } deinit { @@ -35,13 +40,32 @@ class MockSubscriptionWatcher: MQTTSubscriptionWatcher { func getIdentifier() -> Int { return identifier } + func getTopics() -> [String] { return topics } + func messageCallbackDelegate(data: Data) { messageCallbackBlock?(data) } + func disconnectCallbackDelegate(error: Error) { disconnectCallbackBlock?(error) } + + func statusChangeDelegate(status: AWSIoTMQTTStatus) { + statusChangeCallbackBlock?(status) + } + + func subscriptionAcknowledgementDelegate() { + subscriptionAcknowledgementCallbackBlock?() + } + + @available(*, deprecated, message: "This will be removed when we remove connectedCallbackDelegate from MQTTSubscriptionWatcher") + func connectedCallbackDelegate() { + } + + var description: String { + return "\(type(of: self)): \(getIdentifier())" + } } diff --git a/AWSAppSyncIntegrationTests/Helpers/SubscriptionStressTestHelper.swift b/AWSAppSyncIntegrationTests/Helpers/SubscriptionStressTestHelper.swift index e413f59f..d360d42f 100644 --- a/AWSAppSyncIntegrationTests/Helpers/SubscriptionStressTestHelper.swift +++ b/AWSAppSyncIntegrationTests/Helpers/SubscriptionStressTestHelper.swift @@ -10,190 +10,183 @@ import XCTest class SubscriptionStressTestHelper: XCTestCase { private static let numberOfPostsToTest = 40 + private static let networkOperationTimeout = TimeInterval(exactly: numberOfPostsToTest * 2)! + private static let mutationQueue = DispatchQueue(label: "com.amazonaws.appsync.SubscriptionStressTestHelper.mutationQueue") private static let subscriptionQueue = DispatchQueue(label: "com.amazonaws.appsync.SubscriptionStressTestHelper.subscriptionQueue") private var appSyncClient: AWSAppSyncClient! - private var testPostIDs = [GraphQLID](repeating: "", count: SubscriptionStressTestHelper.numberOfPostsToTest) // Hold onto this to retain references to the watchers during the test invocation - var subscriptionWatchers = [AWSAppSyncSubscriptionWatcher]() + private var subscriptionTestStateHolders = [GraphQLID: SubscriptionTestStateHolder]() // MARK: - Public test helper methods - func stressTestSubscriptions(with appSyncClient: AWSAppSyncClient) { + func stressTestSubscriptions(with appSyncClient: AWSAppSyncClient, + delayBetweenSubscriptions delay: TimeInterval? = nil) { defer { - subscriptionWatchers.forEach { $0.cancel() } - cleanUp() + subscriptionTestStateHolders.values.forEach { $0.watcher?.cancel() } } self.appSyncClient = appSyncClient - let allPostsAreCreatedExpectations = createPostsAndMakeExpectations() - wait(for: allPostsAreCreatedExpectations, timeout: TimeInterval(exactly: SubscriptionStressTestHelper.numberOfPostsToTest)!) + createPostsAndPopulateTestStateHolders() - XCTAssertEqual(testPostIDs.count, SubscriptionStressTestHelper.numberOfPostsToTest, "Number of created posts should be \(SubscriptionStressTestHelper.numberOfPostsToTest)") + XCTAssertEqual(subscriptionTestStateHolders.count, SubscriptionStressTestHelper.numberOfPostsToTest) // Add subscriptions for each of the created posts. The expectations will be fulfilled // after the mutations are generated below. - let allSubscriptionsAreTriggeredExpectations = subscribeToMutationsAndMakeExpectations() + subscribeToMutationsAndMakeExpectations(delayBetweenSubscriptions: delay) - print("Waiting 10s for the server to begin delivering subscriptions") - sleep(10) + let allPostsUpvoted = subscriptionTestStateHolders.values.map { $0.postUpvoted! } + let allSubscriptionsAcknowledged = subscriptionTestStateHolders.values.map { $0.subscriptionAcknowledged! } + let allSubscriptionsTriggered = subscriptionTestStateHolders.values.map { $0.subscriptionTriggered! } - let mutationExpectations = mutatePostsAndMakeExpectations() + let timeoutFactor: TimeInterval + if let delay = delay, delay > 1.0 { + timeoutFactor = delay * 2.0 + } else { + timeoutFactor = 2.0 + } - let combinedExpectations = mutationExpectations + allSubscriptionsAreTriggeredExpectations - wait(for: combinedExpectations, timeout: TimeInterval(exactly: SubscriptionStressTestHelper.numberOfPostsToTest)!) + let allExpectations = allSubscriptionsAcknowledged + allPostsUpvoted + allSubscriptionsTriggered + wait(for: allExpectations, timeout: SubscriptionStressTestHelper.networkOperationTimeout * timeoutFactor) } // MARK: - Private utility methods - private func createPostsAndMakeExpectations() -> [XCTestExpectation] { - // Create records to mutate later - var addPostsExpectations = [XCTestExpectation]() - + /// Asynchronously populate `subscriptionTestStateHolders` with new state holders + private func createPostsAndPopulateTestStateHolders() { + // Hold onto these expectations so we can create the mutations prior to returning + var allPostsAreCreatedExpectations = [XCTestExpectation]() for i in 0 ..< SubscriptionStressTestHelper.numberOfPostsToTest { - let addPostExpectation = XCTestExpectation(description: "Added post \(i)") - addPostsExpectations.append(addPostExpectation) + let testData = SubscriptionTestStateHolder(index: i) + + allPostsAreCreatedExpectations.append(testData.postCreated) + appSyncClient.perform(mutation: DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation, queue: SubscriptionStressTestHelper.mutationQueue) { result, error in - XCTAssertNil(error, "Error should be nil") - - guard - let result = result, - let payload = result.data?.createPostWithoutFileUsingParameters - else { - XCTFail("Result & payload should not be nil") - return - } - - XCTAssertEqual(payload.author, DefaultTestPostData.author, "Authors should match.") - let id = payload.id - self.testPostIDs[i] = id - addPostExpectation.fulfill() - print("Successful CreatePostWithoutFileUsingParametersMutation \(i) (\(id))") + XCTAssertNil(error, "Error should be nil") + + guard + let result = result, + let payload = result.data?.createPostWithoutFileUsingParameters + else { + XCTFail("Result & payload should not be nil") + return + } + + XCTAssertEqual(payload.author, DefaultTestPostData.author, "Authors should match.") + let id = payload.id + self.subscriptionTestStateHolders[id] = testData + testData.postId = id + testData.postCreated.fulfill() + print("Post created \(i) (\(id))") } - print("Attempting CreatePostWithoutFileUsingParametersMutation \(i)") } - return addPostsExpectations + wait(for: allPostsAreCreatedExpectations, timeout: SubscriptionStressTestHelper.networkOperationTimeout) } - private func subscribeToMutationsAndMakeExpectations() -> [XCTestExpectation] { - var subscriptionsTriggeredExpectations = [XCTestExpectation]() - - for (i, id) in testPostIDs.enumerated() { - let subscriptionTriggeredExpectation = XCTestExpectation(description: "Subscription triggered for post \(i) (\(id))") - subscriptionsTriggeredExpectations.append(subscriptionTriggeredExpectation) - - let subscription: OnUpvotePostSubscription = OnUpvotePostSubscription(id: id) - let optionalSubscriptionWatcher = try! appSyncClient.subscribe(subscription: subscription, - queue: SubscriptionStressTestHelper.subscriptionQueue) { result, _, error in - XCTAssertNil(error, "Error should be nil") - - guard let payload = result?.data?.onUpvotePost else { - XCTFail("Result & payload should not be nil") - return + /// Schedule subscription and mutation flows for each subscriptionTestStateHolder, with `delay` seconds in between + /// each subscription operation + private func subscribeToMutationsAndMakeExpectations(delayBetweenSubscriptions delay: TimeInterval?) { + let createSubscriptionsQueue = DispatchQueue(label: "subscribeToMutationsAndMakeExpectations") + let subscriptionTestStateHolderValues = subscriptionTestStateHolders.values.map { $0 } + createSubscriptionsQueue.async { + for i in 0 ..< SubscriptionStressTestHelper.numberOfPostsToTest { + let subscriptionTestStateHolder = subscriptionTestStateHolderValues[i] + self.subscribeAcknowledgeAndMutate(subscriptionTestStateHolder: subscriptionTestStateHolder) + if let delay = delay, delay > 0.0 { + Thread.sleep(forTimeInterval: delay) } - - let idFromPayload = payload.id - - subscriptionTriggeredExpectation.fulfill() - print("Triggered OnUpvotePostSubscription \(i) (\(idFromPayload))") } - print("Attempting OnUpvotePostSubscription \(i) (\(id))") - - guard let subscriptionWatcher = optionalSubscriptionWatcher else { - XCTFail("Subscription watcher \(i) (\(id)) should not be nil") - continue - } - - subscriptionWatchers.append(subscriptionWatcher) } - - waitForRegistration(of: subscriptionWatchers) - - return subscriptionsTriggeredExpectations } - typealias UnregisteredWatcherExpectation = (subscriptionWatcher: AWSAppSyncSubscriptionWatcher, expectation: XCTestExpectation) + private func subscribeAcknowledgeAndMutate(subscriptionTestStateHolder: SubscriptionTestStateHolder) { + let subscription: OnUpvotePostSubscription = OnUpvotePostSubscription(id: subscriptionTestStateHolder.postId) - // Currently, `AWSAppSyncClient.subscribe(subscription:queue:resultHandler:)` doesn't have a - // good way to inspect that the subscription has been registered on the service. We'll check - // for `getTopics` returning a non-empty value to stand in for a completion handler - private func waitForRegistration(of subscriptionWatchers: [AWSAppSyncSubscriptionWatcher]) { - - var subscriptionWatcherRegisteredExpectations = [XCTestExpectation]() - - var unregisteredWatcherExpectationsByIndex = [Int: UnregisteredWatcherExpectation]() - for (i, watcher) in subscriptionWatchers.enumerated() { - let expectation = XCTestExpectation(description: "Subscription watcher \(i) is registered") - subscriptionWatcherRegisteredExpectations.append(expectation) - unregisteredWatcherExpectationsByIndex[i] = (subscriptionWatcher: watcher, expectation: expectation) + let statusChangeHandler: SubscriptionStatusChangeHandler = { status in + if case .connected = status { + print("Subscription acknowledged \(subscriptionTestStateHolder.index) (\(subscriptionTestStateHolder.postId!))") + subscriptionTestStateHolder.subscriptionAcknowledged.fulfill() + self.mutatePost(for: subscriptionTestStateHolder) + } } - // Wait until subscriptions are all registered - let subscriptionWatcherRegistrationTimer = Timer.scheduledTimer(withTimeInterval: 0.25, repeats: true) { - _ in - var indexesToDelete = Set() - for index in unregisteredWatcherExpectationsByIndex.keys { - guard let (subscriptionWatcher, expectation) = unregisteredWatcherExpectationsByIndex[index] else { - continue - } - let isRegistered = !subscriptionWatcher.getTopics().isEmpty - if isRegistered { - expectation.fulfill() - indexesToDelete.insert(index) - print("Registered OnUpvotePostSubscription \(index)") - } + print("Subscribing \(subscriptionTestStateHolder.index) (\(subscriptionTestStateHolder.postId!))") + let optionalSubscriptionWatcher = try! appSyncClient.subscribe( + subscription: subscription, + queue: SubscriptionStressTestHelper.subscriptionQueue, + statusChangeHandler: statusChangeHandler + ) { + result, _, error in + XCTAssertNil(error, "Error should be nil") + + guard result?.data?.onUpvotePost != nil else { + XCTFail("Result & payload should not be nil") + return } - for index in indexesToDelete { - unregisteredWatcherExpectationsByIndex.removeValue(forKey: index) - } + subscriptionTestStateHolder.subscriptionTriggered.fulfill() + print("Subscription triggered \(subscriptionTestStateHolder.index) (\(subscriptionTestStateHolder.postId!))") } - // Wait for all subscriptions to be registered - let timeToWait = Double(testPostIDs.count) - wait(for: subscriptionWatcherRegisteredExpectations, timeout: timeToWait) - subscriptionWatcherRegistrationTimer.invalidate() + guard let subscriptionWatcher = optionalSubscriptionWatcher else { + XCTFail("Subscription watcher \(subscriptionTestStateHolder.index) (\(subscriptionTestStateHolder.postId!)) should not be nil") + return + } - XCTAssertTrue(unregisteredWatcherExpectationsByIndex.isEmpty, "All subscriptions should have been registered by now") + subscriptionTestStateHolder.watcher = subscriptionWatcher } /// Mutate the watched objects by upvoting them. This should fire each associated subscription /// We set up mutation expectations so that, if the test fails, we can tell whether it was the /// mutation that failed to complete, or the subscription that failed to trigger - private func mutatePostsAndMakeExpectations() -> [XCTestExpectation] { - var mutationExpectations = [XCTestExpectation]() - for (i, id) in testPostIDs.enumerated() { - let upvoteExpectation = XCTestExpectation(description: "Upvoted on event \(i)") - mutationExpectations.append(upvoteExpectation) - - let mutation = UpvotePostMutation(id: id) - - appSyncClient.perform(mutation: mutation, queue: SubscriptionStressTestHelper.mutationQueue) { result, error in - XCTAssertNil(error, "Error should be nil") - - guard - let result = result, - let _ = result.data?.upvotePost - else { - XCTFail("Result & payload should not be nil") - return - } - upvoteExpectation.fulfill() - print("Successful UpvotePostMutation \(i) (\(id))") + private func mutatePost(for subscriptionTestStateHolder: SubscriptionTestStateHolder) { + let postId = subscriptionTestStateHolder.postId! + let mutation = UpvotePostMutation(id: postId) + + appSyncClient.perform(mutation: mutation, queue: SubscriptionStressTestHelper.mutationQueue) { result, error in + XCTAssertNil(error, "Error should be nil") + + guard + let result = result, + let _ = result.data?.upvotePost + else { + XCTFail("Result & payload should not be nil") + return } - print("Attempting UpvotePostMutation \(i) (\(id))") + print("Post upvoted \(subscriptionTestStateHolder.index) (\(subscriptionTestStateHolder.postId!))") + subscriptionTestStateHolder.postUpvoted.fulfill() } - return mutationExpectations } - // Removes test records from DB - private func cleanUp() { +} + +// A state holder for the various stages of a subscription test, built up by each operation +private class SubscriptionTestStateHolder { + let index: Int + var postId: GraphQLID! { + didSet { + self.subscriptionAcknowledged = XCTestExpectation(description: "subscriptionAcknowledged for \(index) (\(postId!))") + self.postUpvoted = XCTestExpectation(description: "postUpvoted for \(index) (\(postId!))") + self.subscriptionTriggered = XCTestExpectation(description: "subscriptionTriggered for \(index) (\(postId!))") + } + } + + var watcher: AWSAppSyncSubscriptionWatcher! + let postCreated: XCTestExpectation + + // These are created after the postId is set + var subscriptionAcknowledged: XCTestExpectation! + var postUpvoted: XCTestExpectation! + var subscriptionTriggered: XCTestExpectation! + init(index: Int) { + self.index = index + postCreated = XCTestExpectation(description: "postCreated for \(index)") } } diff --git a/AWSAppSyncIntegrationTests/SubscriptionTests.swift b/AWSAppSyncIntegrationTests/SubscriptionTests.swift new file mode 100644 index 00000000..31caf27b --- /dev/null +++ b/AWSAppSyncIntegrationTests/SubscriptionTests.swift @@ -0,0 +1,346 @@ +// +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// Licensed under the Amazon Software License +// http://aws.amazon.com/asl/ +// + +import XCTest +@testable import AWSAppSync +@testable import AWSCore +@testable import AWSAppSyncTestCommon + +class SubscriptionTests: XCTestCase { + + // MARK: - Properties + + /// Use this as our timeout value for any operation that hits the network. Note that this may need to be higher + /// than you think, to account for CI systems running in shared environments + private static let networkOperationTimeout = 60.0 + + private static let mutationQueue = DispatchQueue(label: "com.amazonaws.appsync.SubscriptionTests.mutationQueue") + private static let subscriptionAndFetchQueue = DispatchQueue(label: "com.amazonaws.appsync.SubscriptionTests.subscriptionAndFetchQueue") + + /// This will be automatically instantiated in `performDefaultSetUpSteps` + var appSyncClient: AWSAppSyncClient? + + let authType = AppSyncClientTestHelper.AuthenticationType.apiKey + + override func setUp() { + super.setUp() + + AWSDDLog.sharedInstance.logLevel = .debug + AWSDDTTYLogger.sharedInstance.logFormatter = AWSAppSyncClientLogFormatter() + AWSDDLog.sharedInstance.add(AWSDDTTYLogger.sharedInstance) + + do { + appSyncClient = try SubscriptionTests.makeAppSyncClient(authType: authType) + } catch { + XCTFail(error.localizedDescription) + } + } + + // MARK: - Tests + + // Tests subscription system by registering subscriptions as fast as possible + func testSubscription_Stress() { + guard let appSyncClient = appSyncClient else { + XCTFail("appSyncClient should not be nil") + return + } + let subscriptionStressTestHelper = SubscriptionStressTestHelper() + subscriptionStressTestHelper.stressTestSubscriptions(with: appSyncClient) + } + + // Tests subscription system by registering many subscriptions but interleaving them with a delay, to + // ensure that some connections are being started while others are being dropped + func testSubscription_StressWithInterleavedConnections() { + guard let appSyncClient = appSyncClient else { + XCTFail("appSyncClient should not be nil") + return + } + let subscriptionStressTestHelper = SubscriptionStressTestHelper() + subscriptionStressTestHelper.stressTestSubscriptions(with: appSyncClient, delayBetweenSubscriptions: 1.0) + } + + func testSubscription() throws { + let postCreated = expectation(description: "Post created successfully.") + let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation + + var idHolder: GraphQLID? + appSyncClient?.perform(mutation: addPost, queue: SubscriptionTests.mutationQueue) { result, error in + print("CreatePost result handler invoked") + XCTAssertNil(error) + XCTAssertNotNil(result?.data?.createPostWithoutFileUsingParameters?.id) + XCTAssertEqual(result!.data!.createPostWithoutFileUsingParameters?.author, DefaultTestPostData.author) + idHolder = result?.data?.createPostWithoutFileUsingParameters?.id + postCreated.fulfill() + } + wait(for: [postCreated], timeout: SubscriptionTests.networkOperationTimeout) + + guard let id = idHolder else { + XCTFail("Expected ID from addPost mutation") + return + } + + // This handler will be invoked if an error occurs during the setup, or if we receive a successful mutation response. + let subscriptionResultHandlerInvoked = expectation(description: "Subscription received successfully.") + let subscriptionIsActive = expectation(description: "Upvote subscription should be connected") + + let statusChangeHandler: SubscriptionStatusChangeHandler = { status in + if case .connected = status { + subscriptionIsActive.fulfill() + } + } + + let subscription = try self.appSyncClient?.subscribe(subscription: OnUpvotePostSubscription(id: id), + queue: SubscriptionTests.subscriptionAndFetchQueue, + statusChangeHandler: statusChangeHandler) { result, _, error in + print("Subscription result handler invoked") + guard error == nil else { + XCTAssertNil(error) + return + } + + guard result != nil else { + XCTFail("Result was unexpectedly nil") + return + } + subscriptionResultHandlerInvoked.fulfill() + } + XCTAssertNotNil(subscription, "Subscription expected to be non nil.") + + defer { + subscription?.cancel() + } + + wait(for: [subscriptionIsActive], timeout: SubscriptionTests.networkOperationTimeout) + + let upvotePerformed = expectation(description: "Upvote mutation performed") + let upvoteMutation = UpvotePostMutation(id: id) + self.appSyncClient?.perform(mutation: upvoteMutation, queue: SubscriptionTests.mutationQueue) { + result, error in + print("Received upvote mutation response.") + XCTAssertNil(error) + XCTAssertNotNil(result?.data?.upvotePost?.id) + upvotePerformed.fulfill() + } + + wait(for: [upvotePerformed, subscriptionResultHandlerInvoked], timeout: SubscriptionTests.networkOperationTimeout) + } + + func testSubscriptionReceivesConnectedMessage() throws { + AWSDDLog.sharedInstance.logLevel = .verbose + AWSDDTTYLogger.sharedInstance.logFormatter = AWSAppSyncClientLogFormatter() + AWSDDLog.sharedInstance.add(AWSDDTTYLogger.sharedInstance) + + let statusChangedToConnected = expectation(description: "Subscription received status change notification to 'connected'") + + let statusChangeHandler: SubscriptionStatusChangeHandler = { status in + if case .connected = status { + statusChangedToConnected.fulfill() + } + } + let subscription = try self.appSyncClient?.subscribe(subscription: OnUpvotePostSubscription(id: "123"), + queue: SubscriptionTests.subscriptionAndFetchQueue, + statusChangeHandler: statusChangeHandler) { _, _, _ in } + + defer { + subscription?.cancel() + } + + wait(for: [statusChangedToConnected], timeout: SubscriptionTests.networkOperationTimeout) + } + + func testSubscriptionIsInvokedOnProvidedQueue() throws { + let label = "testSyncOperationAtSetupAndReconnect.syncWatcherCallbackQueue" + let syncWatcherCallbackQueue = DispatchQueue(label: label) + let queueIdentityKey = DispatchSpecificKey() + let queueIdentityValue = label + syncWatcherCallbackQueue.setSpecific(key: queueIdentityKey, value: queueIdentityValue) + + let appSyncClient = try SubscriptionTests.makeAppSyncClient(authType: self.authType) + + let postCreated = expectation(description: "Post created successfully.") + let addPost = DefaultTestPostData.defaultCreatePostWithoutFileUsingParametersMutation + var idHolder: GraphQLID? + appSyncClient.perform(mutation: addPost, queue: SubscriptionTests.mutationQueue) { result, error in + print("CreatePost result handler invoked") + idHolder = result?.data?.createPostWithoutFileUsingParameters?.id + postCreated.fulfill() + } + wait(for: [postCreated], timeout: SubscriptionTests.networkOperationTimeout) + + guard let id = idHolder else { + XCTFail("Expected ID from addPost mutation") + return + } + + // We use the base query result handler to know that the subscription is active. Delta Sync does not attempt to + // perform a server query until the subscription is established, to ensure that no data is lost between the time + // we begin establishing a sync connection and the time we finish the base query + let baseQueryFetchFromServerComplete = expectation(description: "BaseQuery fetch from server complete") + let baseQueryResultHandler: (GraphQLResult?, Error?) -> Void = { result, _ in + guard let result = result else { + return + } + if result.source == .server { + baseQueryFetchFromServerComplete.fulfill() + } + } + + let deltaQueryResultHandler: (GraphQLResult?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in } + + let subscriptionResultHandlerInvoked = expectation(description: "Subscription result handler invoked") + let subscriptionResultHandler: (GraphQLResult?, ApolloStore.ReadWriteTransaction?, Error?) -> Void = { _, _, _ in + subscriptionResultHandlerInvoked.fulfill() + let dispatchQueueValue = DispatchQueue.getSpecific(key: queueIdentityKey) + XCTAssertEqual(dispatchQueueValue, queueIdentityValue, "Expected callback to be invoked on provided queue") + } + + let listPostsQuery = ListPostsQuery() + let subscription = OnUpvotePostSubscription(id: id) + + let syncWatcher = appSyncClient.sync( + baseQuery: listPostsQuery, + baseQueryResultHandler: baseQueryResultHandler, + subscription: subscription, + subscriptionResultHandler: subscriptionResultHandler, + deltaQuery: listPostsQuery, + deltaQueryResultHandler: deltaQueryResultHandler, + callbackQueue: syncWatcherCallbackQueue, + syncConfiguration: SyncConfiguration() + ) + + defer { + syncWatcher.cancel() + } + + wait(for: [baseQueryFetchFromServerComplete], timeout: SubscriptionTests.networkOperationTimeout) + + let upvote = UpvotePostMutation(id: id) + let upvoteComplete = expectation(description: "Upvote mutation completed") + + self.appSyncClient?.perform(mutation: upvote, + queue: SubscriptionTests.mutationQueue) { _, _ in + upvoteComplete.fulfill() + } + + wait( + for: [ + upvoteComplete, + subscriptionResultHandlerInvoked, + ], + timeout: SubscriptionTests.networkOperationTimeout + ) + } + + func testSubscriptionResultHandlerCanOperateOnEmptyCacheWithBackingDatabase() throws { + let rootDirectory = FileManager.default.temporaryDirectory.appendingPathComponent("emptyCacheTest-\(UUID().uuidString)") + let cacheConfiguration = try AWSAppSyncCacheConfiguration(withRootDirectory: rootDirectory) + try doSubscriptionResultHandlerTesting(withCacheConfiguration: cacheConfiguration) + } + + func testSubscriptionResultHandlerCanOperateOnEmptyCacheWithoutBackingDatabase() throws { + try doSubscriptionResultHandlerTesting(withCacheConfiguration: nil) + } + + // MARK: - Utilities + + func doSubscriptionResultHandlerTesting(withCacheConfiguration cacheConfiguration: AWSAppSyncCacheConfiguration?) throws { + let appSyncClient = try SubscriptionTests.makeAppSyncClient(authType: self.authType, cacheConfiguration: cacheConfiguration) + + // OnDeltaPostSubscription requires no knowledge of prior state, so we can use it to test operations on an + // empty cache + let subscriptionWatcherTriggered = expectation(description: "Subscription watcher was triggered") + // We don't care if this gets triggered multiple times + subscriptionWatcherTriggered.assertForOverFulfill = false + + let subscriptionIsActive = expectation(description: "Subscription should be active") + let statusChangeHandler: SubscriptionStatusChangeHandler = { status in + if case .connected = status { + subscriptionIsActive.fulfill() + } + } + + let subscription = try appSyncClient.subscribe(subscription: OnDeltaPostSubscription(), + queue: SubscriptionTests.subscriptionAndFetchQueue, + statusChangeHandler: statusChangeHandler) { result, transaction, error in + defer { + subscriptionWatcherTriggered.fulfill() + } + + guard let transaction = transaction else { + XCTFail("Transaction unexpectedly nil in subscription watcher") + return + } + + guard error == nil else { + XCTFail("Unexpected error in subscription watcher: \(error!.localizedDescription)") + return + } + + guard + let result = result, + let onDeltaPostGraphQLResult = result.data?.onDeltaPost + else { + XCTFail("Result onDeltaPost unexpectedly empty in subscription watcher") + return + } + + let newPost = ListPostsQuery.Data.ListPost(id: onDeltaPostGraphQLResult.id, + author: onDeltaPostGraphQLResult.author, + title: onDeltaPostGraphQLResult.title, + content: onDeltaPostGraphQLResult.content, + url: onDeltaPostGraphQLResult.url, + ups: onDeltaPostGraphQLResult.ups, + downs: onDeltaPostGraphQLResult.downs, + file: nil, + createdDate: onDeltaPostGraphQLResult.createdDate, + awsDs: onDeltaPostGraphQLResult.awsDs) + + do { + try transaction.update(query: ListPostsQuery()) { (data: inout ListPostsQuery.Data) in + XCTAssertNil(data.listPosts) + data.listPosts = [newPost] + } + } catch { + XCTFail("Unexpected error updating local cache in subscription watcher: \(error.localizedDescription)") + return + } + } + + defer { + subscription?.cancel() + } + + wait(for: [subscriptionIsActive], timeout: SubscriptionTests.networkOperationTimeout) + + let newPost = CreatePostWithoutFileUsingParametersMutation(author: "Test author", + title: "Test Title", + content: "Test content", + url: "http://www.amazon.com/", + ups: 0, + downs: 0) + + let newPostCreated = expectation(description: "New post created") + self.appSyncClient?.perform(mutation: newPost, + queue: SubscriptionTests.mutationQueue) { _, _ in + newPostCreated.fulfill() + } + + wait(for: [newPostCreated, subscriptionWatcherTriggered], timeout: SubscriptionTests.networkOperationTimeout) + } + + static func makeAppSyncClient(authType: AppSyncClientTestHelper.AuthenticationType, + cacheConfiguration: AWSAppSyncCacheConfiguration? = nil) throws -> DeinitNotifiableAppSyncClient { + + let testBundle = Bundle(for: SubscriptionTests.self) + let helper = try AppSyncClientTestHelper( + with: authType, + cacheConfiguration: cacheConfiguration, + testBundle: testBundle + ) + return helper.appSyncClient + } + +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 91b8bf5f..67602544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,11 +2,20 @@ The AWS AppSync SDK for iOS enables you to access your AWS AppSync backend and perform operations like `Queries`, `Mutations` and `Subscriptions`. The SDK also includes support for offline operations. -## 2.10.2 +## 2.10.3 + +### New Features + +- The AppSyncClient's `subscribe` method now accepts an optional `statusChangeHandler`. If provided, then the `AWSAppSyncSubscriptionWatcher` + returned by the `subscribe` method will invoke that method when it is notified of changes to the state of the underlying MQTT client. + `AWSAppSyncSubscriptionWatcherStatus` for a description of the statuses and their progression. Thanks @fans3210, @shannon-hager-skookum, and @achager for + contributing your thoughts to the original request ([Issue #42](https://github.com/awslabs/aws-mobile-appsync-sdk-ios/issues/42)) and to + @MarioBajr for contributing the original implementation on [PR #75](https://github.com/awslabs/aws-mobile-appsync-sdk-ios/pull/75). ### Bug fixes - Fixed incorrect AWSCore dependency version in podspec ([Issue #190](https://github.com/awslabs/aws-mobile-appsync-sdk-ios/issues/190)) +- Fixed data races in AppSyncMQTTClient that were causing crashes ([Issue #184](https://github.com/awslabs/aws-mobile-appsync-sdk-ios/issues/184)) ### Misc. Updates @@ -22,7 +31,11 @@ The AWS AppSync SDK for iOS enables you to access your AWS AppSync backend and p 2019-03-04 07:21:32.135-0800 [V AWSPerformMutationQueue.init(appSyncClient:networkClient:reachabiltyChangeNotifier:cacheFileURL:), L24] Initializing AWSPerformMutationQueue 2019-03-04 07:21:32.135-0800 [V AWSPerformMutationQueue.resume(), L95] Resuming OperationQueue ``` -- Added some verbose logging around mutation queue handling; minor log additions elsewhere + Please note that `verbose` logging is quite verbose, and there is a significant difference between `verbose` and `debug`. We will be making + `debug` more useful as we go. (See [Issue #145](https://github.com/awslabs/aws-mobile-appsync-sdk-ios/issues/145)) + + As always, we recommend turning off logging when deploying to production. +- Added some verbose logging around mutation queue handling and subscription connections; minor log additions elsewhere - Minor dead code removal & miscellaneous cleanup ## 2.10.2 From 8faf176ffa0570c137421da242e777685552737e Mon Sep 17 00:00:00 2001 From: Tim Schmelter Date: Thu, 7 Mar 2019 21:04:54 -0800 Subject: [PATCH 2/2] Increase log level to .warning for subscription tests to avoid spamming build logs --- AWSAppSyncIntegrationTests/SubscriptionTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AWSAppSyncIntegrationTests/SubscriptionTests.swift b/AWSAppSyncIntegrationTests/SubscriptionTests.swift index 31caf27b..cb8da39b 100644 --- a/AWSAppSyncIntegrationTests/SubscriptionTests.swift +++ b/AWSAppSyncIntegrationTests/SubscriptionTests.swift @@ -28,7 +28,7 @@ class SubscriptionTests: XCTestCase { override func setUp() { super.setUp() - AWSDDLog.sharedInstance.logLevel = .debug + AWSDDLog.sharedInstance.logLevel = .warning AWSDDTTYLogger.sharedInstance.logFormatter = AWSAppSyncClientLogFormatter() AWSDDLog.sharedInstance.add(AWSDDTTYLogger.sharedInstance)