Skip to content

Commit

Permalink
Fixed AppSyncMQTTClient data races (#184)
Browse files Browse the repository at this point in the history
- Added `statusChangeHandler` to AppSyncClient's `subscribe` method to provide status updates.
- Added verbose logging around subscription connections
- Cancel streams and reconnect threads on disconnect
  • Loading branch information
palpatim committed Mar 8, 2019
1 parent f6691ef commit fdd8679
Show file tree
Hide file tree
Showing 17 changed files with 1,446 additions and 760 deletions.
12 changes: 12 additions & 0 deletions AWSAppSyncClient.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@
FA0C12BB21CD308A00B438CB /* AWSAppSyncClientConfiguration.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12B921CD2FFB00B438CB /* AWSAppSyncClientConfiguration.swift */; };
FA0C12BF21CD360B00B438CB /* AWSAppSyncAuthType.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12BE21CD360B00B438CB /* AWSAppSyncAuthType.swift */; };
FA0C12C121CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0C12C021CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift */; };
FA0D825422307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */; };
FA0D82582230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */; };
FA0D825C22317C7900E0EA82 /* SubscriptionTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */; };
FA1A620C21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA1A620B21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift */; };
FA2B4598221DDF2C00F68E6C /* CachePersistenceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA2B4597221DDF2C00F68E6C /* CachePersistenceTests.swift */; };
FA2B459A221F436400F68E6C /* MutationQueuePerformanceTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = FA2B4599221F436400F68E6C /* MutationQueuePerformanceTests.swift */; };
Expand Down Expand Up @@ -444,6 +447,9 @@
FA0C12BE21CD360B00B438CB /* AWSAppSyncAuthType.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncAuthType.swift; sourceTree = "<group>"; };
FA0C12C021CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BasicAWSAPIKeyAuthProvider.swift; sourceTree = "<group>"; };
FA0C12C721CD96BF00B438CB /* AWSAppSyncClientConfigurationTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncClientConfigurationTests.swift; sourceTree = "<group>"; };
FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncSubscriptionWatcherStatus.swift; sourceTree = "<group>"; };
FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncSubscriptionError.swift; sourceTree = "<group>"; };
FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = SubscriptionTests.swift; sourceTree = "<group>"; };
FA1A620B21E6533A00AA54D0 /* AWSAppSyncRetryHandlerTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AWSAppSyncRetryHandlerTests.swift; sourceTree = "<group>"; };
FA2B4597221DDF2C00F68E6C /* CachePersistenceTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = CachePersistenceTests.swift; sourceTree = "<group>"; };
FA2B4599221F436400F68E6C /* MutationQueuePerformanceTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MutationQueuePerformanceTests.swift; sourceTree = "<group>"; };
Expand Down Expand Up @@ -776,7 +782,9 @@
FAAACC2521DD7BC300D24B37 /* AWSAppSyncMutations.swift */,
FAF5D78C21D3F04E00FC04D2 /* AWSAppSyncServiceConfig.swift */,
FA3E128421D5297500F2D19A /* AWSAppSyncServiceConfigError.swift */,
FA0D82572230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift */,
1729A0CF1FA1365900F88594 /* AWSAppSyncSubscriptionWatcher.swift */,
FA0D825322307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift */,
DF9468DA20E1CA4A00E40482 /* AWSNetworkTransport.swift */,
17A7F8C91FB8BCBC008D1393 /* AWSS3ObjectProtocol.swift */,
CCEF79E021DE7FF7004AD64D /* Deprecations+Removals.swift */,
Expand Down Expand Up @@ -885,6 +893,7 @@
17664128214F6732003AE269 /* AWSAppSyncAPIKeyAuthTests.swift */,
174F80AE2109229C00775D0D /* AWSAppSyncCognitoAuthTests.swift */,
FA8C62C421D6E41600FF9924 /* Info.plist */,
FA0D825B22317C7900E0EA82 /* SubscriptionTests.swift */,
FAA83BF7220A6AB40029FF7B /* testS3Object.jpg */,
);
path = AWSAppSyncIntegrationTests;
Expand Down Expand Up @@ -1568,6 +1577,7 @@
17E009C61FCAB234005031DB /* GraphQLSelectionSet.swift in Sources */,
FAFB258722051B2100068B38 /* AWSAppSyncClientInfo.swift in Sources */,
17E009C81FCAB234005031DB /* Locking.swift in Sources */,
FA0D825422307BA400E0EA82 /* AWSAppSyncSubscriptionWatcherStatus.swift in Sources */,
FA3E128321D5290900F2D19A /* AWSAppSyncClientInfoError.swift in Sources */,
FABD70692203C17A00C99B47 /* AWSAppSyncCache.swift in Sources */,
174F80922107E74F00775D0D /* AWSMQTTDecoder.m in Sources */,
Expand All @@ -1576,6 +1586,7 @@
FA0C12BB21CD308A00B438CB /* AWSAppSyncClientConfiguration.swift in Sources */,
178B31081FCDB34100EA4619 /* AWSAppSyncClientConflictResolutionExtensions.swift in Sources */,
1729A0D01FA1365900F88594 /* AWSAppSyncSubscriptionWatcher.swift in Sources */,
FA0D82582230D0AF00E0EA82 /* AWSAppSyncSubscriptionError.swift in Sources */,
FAAACC2421DD7AC600D24B37 /* InternalS3ObjectDetails.swift in Sources */,
17E009D91FCAB234005031DB /* JSONStandardTypeConversions.swift in Sources */,
FA0C12C121CD3BB200B438CB /* BasicAWSAPIKeyAuthProvider.swift in Sources */,
Expand Down Expand Up @@ -1635,6 +1646,7 @@
FA902D1021D97EB100C4052F /* AWSAppSyncCognitoAuthTests.swift in Sources */,
FA902D1321D97EC500C4052F /* SubscriptionStressTestHelper.swift in Sources */,
FA902D0F21D97EAD00C4052F /* AWSAppSyncAPIKeyAuthTests.swift in Sources */,
FA0D825C22317C7900E0EA82 /* SubscriptionTests.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down
27 changes: 8 additions & 19 deletions AWSAppSyncClient/AWSAppSyncClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import AWSCore

public typealias SubscriptionResultHandler<Operation: GraphQLSubscription> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void

public typealias SubscriptionStatusChangeHandler = (AWSAppSyncSubscriptionWatcherStatus) -> Void

public typealias DeltaQueryResultHandler<Operation: GraphQLQuery> = (_ result: GraphQLResult<Operation.Data>?, _ transaction: ApolloStore.ReadWriteTransaction?, _ error: Error?) -> Void

public typealias OptimisticResponseBlock = (ApolloStore.ReadWriteTransaction?) -> Void
Expand All @@ -17,23 +19,6 @@ public typealias MutationConflictHandler<Mutation: GraphQLMutation> = (_ serverS

internal let NoOpOperationString = "No-op"

public struct AWSAppSyncSubscriptionError: Error, LocalizedError {
let additionalInfo: String?
let errorDetails: [String: String]?

public var errorDescription: String? {
return additionalInfo ?? "Unable to start subscription."
}

public var recoverySuggestion: String? {
return errorDetails?["recoverySuggestion"]
}

public var failureReason: String? {
return errorDetails?["failureReason"]
}
}

/// Delegates will be notified when a mutation is performed from the `mutationCallback`. This pattern is necessary
/// in order to provide notifications of mutations which are performed after an app restart and the initial callback
/// context has been lost.
Expand Down Expand Up @@ -76,7 +61,6 @@ public class AWSAppSyncClient {

self.autoSubmitOfflineMutations = appSyncConfig.autoSubmitOfflineMutations
self.store = appSyncConfig.store
self.appSyncMQTTClient.allowCellularAccess = appSyncConfig.allowsCellularAccess
self.presignedURLClient = appSyncConfig.presignedURLClient
self.s3ObjectManager = appSyncConfig.s3ObjectManager
self.subscriptionMetadataCache = appSyncConfig.subscriptionMetadataCache
Expand Down Expand Up @@ -155,14 +139,18 @@ public class AWSAppSyncClient {
return apolloClient!.watch(query: query, cachePolicy: cachePolicy, queue: queue, resultHandler: resultHandler)
}

public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription, queue: DispatchQueue = DispatchQueue.main, resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {
public func subscribe<Subscription: GraphQLSubscription>(subscription: Subscription,
queue: DispatchQueue = DispatchQueue.main,
statusChangeHandler: SubscriptionStatusChangeHandler? = nil,
resultHandler: @escaping SubscriptionResultHandler<Subscription>) throws -> AWSAppSyncSubscriptionWatcher<Subscription>? {

return AWSAppSyncSubscriptionWatcher(client: self.appSyncMQTTClient,
httpClient: self.httpTransport!,
store: self.store!,
subscriptionsQueue: self.subscriptionsQueue,
subscription: subscription,
handlerQueue: queue,
statusChangeHandler: statusChangeHandler,
resultHandler: resultHandler)
}

Expand All @@ -174,6 +162,7 @@ public class AWSAppSyncClient {
subscriptionsQueue: self.subscriptionsQueue,
subscription: subscription,
handlerQueue: queue,
statusChangeHandler: nil,
connectedCallback: connectCallback,
resultHandler: resultHandler)
}
Expand Down
97 changes: 97 additions & 0 deletions AWSAppSyncClient/AWSAppSyncSubscriptionError.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// Licensed under the Amazon Software License
// http://aws.amazon.com/asl/
//

import Foundation

public enum AWSAppSyncSubscriptionError: Error, LocalizedError {
/// The underlying MQTT client reported a status of "connectionError"
case connectionError

/// The underlying MQTT client reported a status of "connectionRefused"
case connectionRefused

/// The underlying MQTT client reported a status of "disconnected"
case disconnected

/// An error occurred parsing the subscription message received from the service
case messageCallbackError(String)

/// Some other error occurred. See associated value for details
case other(Error)

/// An error occurred parsing the published subscription message
case parseError(Error)

/// The underlying MQTT client reported a status of "protocolError"
case protocolError

/// An error occurred while making the initial subscription request to AppSync, parsing its response, or
/// evaluating the response's subscription info payload
case setupError(String)

/// The underlying MQTT client reported a status of "unknown"
case unknownMQTTConnectionStatus

public var errorDescription: String? {
switch self {
case .messageCallbackError(let message):
return message
case .other(let error):
return error.localizedDescription
case .parseError(let error):
return error.localizedDescription
case .setupError(let message):
return message
case .unknownMQTTConnectionStatus:
return "MQTT status unknown"
default:
return "Subscription Terminated."
}
}

public var recoverySuggestion: String? {
switch self {
case .other(let error as NSError):
return error.localizedRecoverySuggestion
case .parseError, .unknownMQTTConnectionStatus:
return nil
default:
return "Restart subscription request."
}
}

public var failureReason: String? {
switch self {
case .other(let error as NSError):
return error.localizedFailureReason
case .parseError, .unknownMQTTConnectionStatus:
return nil
case .setupError(let message):
return message
default:
return "Disconnected from service."
}
}
}

extension AWSAppSyncSubscriptionError {
static func from(status: AWSIoTMQTTStatus) -> AWSAppSyncSubscriptionError? {
switch status {
case .connectionError:
return .connectionError
case .connectionRefused:
return .connectionRefused
case .disconnected:
return .disconnected
case .protocolError:
return .protocolError
case .unknown:
return .unknownMQTTConnectionStatus
default:
return nil
}
}
}
Loading

0 comments on commit fdd8679

Please sign in to comment.