-
Notifications
You must be signed in to change notification settings - Fork 199
/
AppSyncRealTimeClient.swift
506 lines (442 loc) · 18.4 KB
/
AppSyncRealTimeClient.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
//
// Copyright Amazon.com Inc. or its affiliates.
// All Rights Reserved.
//
// SPDX-License-Identifier: Apache-2.0
//
import Foundation
import Amplify
import Combine
@_spi(WebSocket) import AWSPluginsCore
/**
The AppSyncRealTimeClient conforms to the AppSync real-time WebSocket protocol.
ref: https://docs.aws.amazon.com/appsync/latest/devguide/real-time-websocket-client.html
*/
actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
static let jsonEncoder = JSONEncoder()
static let jsonDecoder = JSONDecoder()
enum State {
case none
case connecting
case connected
case connectionDropped
case disconnecting
case disconnected
}
/// Internal state for tracking AppSync connection
private let state = CurrentValueSubject<State, Never>(.none)
/// Subscriptions created using this client
private var subscriptions = [String: AppSyncRealTimeSubscription]()
/// heart beat stream to keep connection alive
private let heartBeats = PassthroughSubject<Void, Never>()
/// Cancellables bind to instance life cycle
private var cancellables = Set<AnyCancellable>()
/// Cancellables bind to connection life cycle
private var cancellablesBindToConnection = Set<AnyCancellable>()
/// AppSync RealTime server endpoint
internal let endpoint: URL
/// Interceptor for decorating AppSyncRealTimeRequest
internal let requestInterceptor: AppSyncRequestInterceptor
/// WebSocketClient offering connections at the WebSocket protocol level
internal var webSocketClient: AppSyncWebSocketClientProtocol
/// Writable data stream convert WebSocketEvent to AppSyncRealTimeResponse
internal let subject = PassthroughSubject<Result<AppSyncRealTimeResponse, Error>, Never>()
var isConnected: Bool {
self.state.value == .connected
}
internal var numberOfSubscriptions: Int {
self.subscriptions.count
}
/**
Creates a new AppSyncRealTimeClient with endpoint, requestInterceptor and webSocketClient.
- Parameters:
- endpoint: AppSync real-time server endpoint
- requestInterceptor: Interceptor for decocating AppSyncRealTimeRequest
- webSocketClient: WebSocketClient for reading/writing to connection
*/
init(
endpoint: URL,
requestInterceptor: AppSyncRequestInterceptor,
webSocketClient: AppSyncWebSocketClientProtocol
) {
self.endpoint = endpoint
self.requestInterceptor = requestInterceptor
self.webSocketClient = webSocketClient
Task { await self.subscribeToWebSocketEvent() }
}
deinit {
log.debug("Deinit AppSyncRealTimeClient")
subject.send(completion: .finished)
cancellables = Set()
cancellablesBindToConnection = Set()
}
/**
Connecting to remote AppSync real-time server.
*/
func connect() async throws {
switch self.state.value {
case .connecting, .connected:
log.debug("[AppSyncRealTimeClient] client is already connecting or connected")
return
case .disconnecting:
try await waitForState(.disconnected)
case .connectionDropped, .disconnected, .none:
break
}
guard self.state.value != .connecting else {
log.debug("[AppSyncRealTimeClient] actor reentry, state has been changed to connecting")
return
}
self.state.send(.connecting)
log.debug("[AppSyncRealTimeClient] client start connecting")
try await RetryWithJitter.execute { [weak self] in
guard let self else { return }
await self.webSocketClient.connect(
autoConnectOnNetworkStatusChange: true,
autoRetryOnConnectionFailure: true
)
try await self.sendRequest(.connectionInit)
}
}
/**
Disconnect only when there are no subscriptions exist.
*/
func disconnectWhenIdel() async {
if self.subscriptions.isEmpty {
log.debug("[AppSyncRealTimeClient] no subscription exist, client is trying to disconnect")
await disconnect()
} else {
log.debug("[AppSyncRealTimeClient] client only try to disconnect when no subscriptions exist")
}
}
/**
Disconnect from AppSync real-time server.
*/
func disconnect() async {
guard self.state.value != .disconnecting else {
log.debug("[AppSyncRealTimeClient] client already disconnecting")
return
}
defer { self.state.send(.disconnected) }
log.debug("[AppSyncRealTimeClient] client start disconnecting")
self.state.send(.disconnecting)
self.cancellablesBindToConnection = Set()
await self.webSocketClient.disconnect()
log.debug("[AppSyncRealTimeClient] client is disconnected")
}
/**
Subscribing to a query with unique identifier.
- Parameters:
- id: unique identifier
- query: GraphQL query for subscription
- Returns:
A never fail data stream for AppSyncSubscriptionEvent.
*/
func subscribe(id: String, query: String) async throws -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
log.debug("[AppSyncRealTimeClient] Received subscription request id: \(id), query: \(query)")
let subscription = AppSyncRealTimeSubscription(id: id, query: query, appSyncRealTimeClient: self)
subscriptions[id] = subscription
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}
return filterAppSyncSubscriptionEvent(with: id)
.merge(with: (await subscription.publisher).toAppSyncSubscriptionEventStream())
.eraseToAnyPublisher()
}
private func waitForState(_ targetState: State) async throws {
var cancellables = Set<AnyCancellable>()
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Swift.Error>) -> Void in
state.filter { $0 == targetState }
.setFailureType(to: AppSyncRealTimeRequest.Error.self)
.timeout(.seconds(10), scheduler: DispatchQueue.global())
.first()
.sink { completion in
switch completion {
case .finished:
continuation.resume(returning: ())
case .failure(let error):
continuation.resume(throwing: error)
}
} receiveValue: { _ in }
.store(in: &cancellables)
}
}
/**
Unsubscribe a subscription with unique identifier.
- Parameters:
- id: unique identifier of the subscription.
*/
func unsubscribe(id: String) async throws {
defer {
log.debug("[AppSyncRealTimeClient] deleted subscription with id: \(id)")
subscriptions.removeValue(forKey: id)
}
guard let subscription = subscriptions[id] else {
log.debug("[AppSyncRealTimeClient] start subscription failed, could not found subscription with id \(id) ")
return
}
log.debug("[AppSyncRealTimeClient] unsubscribing: \(id)")
try await subscription.unsubscribe()
}
private func startSubscription(_ id: String) async throws -> AnyCancellable {
guard let subscription = subscriptions[id] else {
log.debug("[AppSyncRealTimeClient] start subscription failed, could not found subscription with id \(id) ")
throw APIError.unknown("Could not find a subscription with id \(id)", "", nil)
}
try await subscription.subscribe()
return AnyCancellable {
Task {
try await subscription.unsubscribe()
}
}
}
private func subscribeToWebSocketEvent() async {
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
self.storeInCancellables(cancellable)
}
private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task { [weak self] in
do {
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
}
nonisolated private func writeAppSyncEvent(_ event: AppSyncRealTimeRequest) async throws {
guard await self.webSocketClient.isConnected else {
log.debug("[AppSyncRealTimeClient] Attempting to write to a webSocket haven't been connected.")
return
}
let interceptedEvent = await self.requestInterceptor.interceptRequest(event: event, url: self.endpoint)
let eventString = try String(data: Self.jsonEncoder.encode(interceptedEvent), encoding: .utf8)!
log.debug("[AppSyncRealTimeClient] Writing AppSyncEvent \(eventString)")
try await webSocketClient.write(message: eventString)
}
/**
Filter response to downstream by id.
- Parameters:
- id: subscription identifier
- Returns:
- AppSyncSubscriptionEvent data stream related to subscription
- important: connection errors will also be passed to downstreams
*/
private func filterAppSyncSubscriptionEvent(
with id: String
) -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
subject.filter {
switch $0 {
case .success(let response): return response.id == id || response.type == .connectionError
case .failure: return true
}
}
.map { result -> AppSyncSubscriptionEvent? in
switch result {
case .success(let response):
switch response.type {
case .connectionError, .error:
return .error(Self.decodeAppSyncRealTimeResponseError(response.payload))
case .data:
return response.payload.map { .data($0) }
default:
return nil
}
case .failure(let error):
return .error([error])
}
}
.compactMap { $0 }
.eraseToAnyPublisher()
}
private func reconnect() async {
do {
log.debug("[AppSyncRealTimeClient] Reconnecting")
await disconnect()
try await connect()
} catch {
log.debug("[AppSyncRealTimeClient] Failed to reconnect, error: \(error)")
}
}
private static func decodeAppSyncRealTimeResponseError(_ data: JSONValue?) -> [Error] {
let knownAppSyncRealTimeRequestErorrs =
Self.decodeAppSyncRealTimeRequestError(data)
.filter { !$0.isUnknown }
if knownAppSyncRealTimeRequestErorrs.isEmpty {
let graphQLErrors = Self.decodeGraphQLErrors(data)
return graphQLErrors.isEmpty
? [APIError.operationError("Failed to decode AppSync error response", "", nil)]
: graphQLErrors
} else {
return knownAppSyncRealTimeRequestErorrs
}
}
private static func decodeGraphQLErrors(_ data: JSONValue?) -> [GraphQLError] {
do {
return try GraphQLErrorDecoder.decodeAppSyncErrors(data)
} catch {
log.debug("[AppSyncRealTimeClient] Failed to decode errors: \(error)")
return []
}
}
private static func decodeAppSyncRealTimeRequestError(_ data: JSONValue?) -> [AppSyncRealTimeRequest.Error] {
guard let errorsJson = data?.errors else {
log.error("[AppSyncRealTimeClient] No 'errors' field found in response json")
return []
}
let errors = errorsJson.asArray ?? [errorsJson]
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}
}
// MARK: - On WebSocket Events
extension AppSyncRealTimeClient {
private func onWebSocketEvent(_ event: WebSocketEvent) {
log.debug("[AppSyncRealTimeClient] Received websocket event \(event)")
switch event {
case .connected:
log.debug("[AppSyncRealTimeClient] WebSocket connected")
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}
case let .disconnected(closeCode, reason): //
log.debug("[AppSyncRealTimeClient] WebSocket disconnected with closeCode: \(closeCode), reason: \(String(describing: reason))")
if self.state.value != .disconnecting || self.state.value != .disconnected {
self.state.send(.connectionDropped)
}
self.cancellablesBindToConnection = Set()
case .error(let error):
// Propagate connection error to downstream for Sync engine to restart
log.debug("[AppSyncRealTimeClient] WebSocket error event: \(error)")
self.subject.send(.failure(error))
case .string(let string):
guard let data = string.data(using: .utf8) else {
log.debug("[AppSyncRealTimeClient] Failed to decode string \(string)")
return
}
guard let response = try? Self.jsonDecoder.decode(AppSyncRealTimeResponse.self, from: data) else {
log.debug("[AppSyncRealTimeClient] Failed to decode string to AppSync event")
return
}
self.onAppSyncRealTimeResponse(response)
case .data(let data):
guard let response = try? Self.jsonDecoder.decode(AppSyncRealTimeResponse.self, from: data) else {
log.debug("[AppSyncRealTimeClient] Failed to decode data to AppSync event")
return
}
self.onAppSyncRealTimeResponse(response)
}
}
}
// MARK: - On AppSyncServer Event
extension AppSyncRealTimeClient {
/// handles connection level response and passes request level response to downstream
private func onAppSyncRealTimeResponse(_ event: AppSyncRealTimeResponse) {
switch event.type {
case .connectionAck:
log.debug("[AppSyncRealTimeClient] AppSync connected: \(String(describing: event.payload))")
subject.send(.success(event))
self.resumeExistingSubscriptions()
self.state.send(.connected)
self.monitorHeartBeats(event.payload)
case .keepAlive:
self.heartBeats.send(())
default:
log.debug("[AppSyncRealTimeClient] AppSync received response: \(event)")
subject.send(.success(event))
}
}
func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}
extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}
private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}
extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
switch subscriptionState {
case .subscribing: return .subscribing
case .subscribed: return .subscribed
case .unsubscribed: return .unsubscribed
default: return nil
}
}
.eraseToAnyPublisher()
}
}
extension AppSyncRealTimeClient: DefaultLogger {
static var log: Logger {
Amplify.Logging.logger(forCategory: CategoryType.api.displayName, forNamespace: String(describing: self))
}
nonisolated var log: Logger { Self.log }
}
extension AppSyncRealTimeClient: Resettable {
func reset() async {
subject.send(completion: .finished)
cancellables = Set()
cancellablesBindToConnection = Set()
if let resettableWebSocketClient = webSocketClient as? Resettable {
await resettableWebSocketClient.reset()
}
}
}
fileprivate extension Task {
var toAnyCancellable: AnyCancellable {
AnyCancellable {
if !self.isCancelled {
self.cancel()
}
}
}
}