Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api): storing cancelablles with actor methods in AppSyncRTC #3824

Merged
merged 3 commits into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
Task { [weak self] in
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.bindCancellableToConnection(try await self.startSubscription(id))
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}

return filterAppSyncSubscriptionEvent(with: id)
Expand Down Expand Up @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
.store(in: &cancellables)
self.storeInCancellables(cancellable)
}

private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task {
Task { [weak self] in
do {
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
Expand Down Expand Up @@ -286,7 +292,7 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
subject.filter {
switch $0 {
case .success(let response): return response.id == id || response.type == .connectionError
case .failure(let error): return true
case .failure: return true
}
}
.map { result -> AppSyncSubscriptionEvent? in
Expand Down Expand Up @@ -350,10 +356,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}

}

// MARK: - On WebSocket Events
Expand All @@ -366,8 +368,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}

case let .disconnected(closeCode, reason): //
Expand Down Expand Up @@ -425,24 +430,37 @@ extension AppSyncRealTimeClient {
}
}

private func monitorHeartBeats(_ connectionAck: JSONValue?) {
func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
heartBeats.eraseToAnyPublisher()
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
.store(in: &cancellablesBindToConnection)
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}

extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}

private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}

extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,32 @@ class AppSyncRealTimeClientTests: XCTestCase {
await fulfillment(of: [startTriggered, errorReceived], timeout: 2)

}

func testReconnect_whenHeartBeatSignalIsNotReceived() async throws {
var cancellables = Set<AnyCancellable>()
let timeout = 1.0
let mockWebSocketClient = MockWebSocketClient()
let mockAppSyncRequestInterceptor = MockAppSyncRequestInterceptor()
let appSyncClient = AppSyncRealTimeClient(
endpoint: URL(string: "https://example.com")!,
requestInterceptor: mockAppSyncRequestInterceptor,
webSocketClient: mockWebSocketClient
)

// start monitoring
await appSyncClient.monitorHeartBeats(.object([
"connectionTimeoutMs": 100
]))

let reconnect = expectation(description: "webSocket triggers event to connection")
await mockWebSocketClient.actionSubject.sink { action in
switch action {
case .connect:
reconnect.fulfill()
default: break
}
}.store(in: &cancellables)
await fulfillment(of: [reconnect], timeout: 2)
}

}
Loading