-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(datastore): Dispatch outboxStatus, subscriptionsEstablished, syncQueriesStarted events #721
Changes from 23 commits
19ade20
ee22220
5c578a9
e59be63
e6db87a
beaf801
2fc86e6
c12e462
6d27bad
0a03b27
d222390
f2263af
e4315f7
9de6be5
9c5f606
d962a58
2a91473
de6e778
effabe9
fe6fb22
720cbd9
f534255
7645ba2
9c88647
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,4 +23,15 @@ public extension HubPayload.EventName.DataStore { | |
/// Dispatched when DataStore receives a sync response from the remote API via the API category. The Hub Payload | ||
/// will be a `MutationEvent` instance that caused the conditional save failed. | ||
static let conditionalSaveFailed = "DataStore.conditionalSaveFailed" | ||
|
||
/// Dispatched on DataStore start and also every time a local mutation is enqueued and processed in the outbox | ||
/// HubPayload `OutboxStatusEvent` contains a boolean value `isEmpty` to notify if there are mutations in the outbox | ||
static let outboxStatus = "DataStore.outboxStatus" | ||
|
||
/// Dispatched when all of the subscriptions to syncable models have been established | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Dispatched when DataStore has finished establishing its subscriptions to all syncable models" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||
static let subscriptionsEstablished = "DataStore.subscriptionEstablished" | ||
|
||
/// Dispatched when DataStore is about to start sync queries | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Dispatched when DataStore starts establishing its subscriptions to all syncable models" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For |
||
/// HubPayload `syncQueriesStartedEvent` contains an array of each model's `name` | ||
static let syncQueriesStarted = "DataStore.syncQueriesStarted" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
// | ||
// Copyright 2018-2020 Amazon.com, | ||
// Inc. or its affiliates. All Rights Reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
import Foundation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary import There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
|
||
lawmicha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Used as HubPayload for the `OutboxStatus` | ||
public struct OutboxStatusEvent { | ||
/// status of outbox: empty or not | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. " |
||
public let isEmpty: Bool | ||
|
||
public init(isEmpty: Bool) { | ||
self.isEmpty = isEmpty | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
// | ||
// Copyright 2018-2020 Amazon.com, | ||
// Inc. or its affiliates. All Rights Reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
import Foundation | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary import There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
|
||
/// Used as HubPayload for the `SyncQueriesStarted` | ||
public struct SyncQueriesStartedEvent { | ||
/// list of model names | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "A list of all model names for which DataStore has started establishing subscriptions" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||
public let models: [String] | ||
|
||
public init(models: [String]) { | ||
self.models = models | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -138,10 +138,12 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { | |
mutationEventPublisher: MutationEventPublisher) { | ||
log.verbose(#function) | ||
self.api = api | ||
operationQueue.isSuspended = false | ||
|
||
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)` | ||
mutationEventPublisher.publisher.subscribe(self) | ||
queryMutationEventsFromStorage(onComplete: { | ||
self.operationQueue.isSuspended = false | ||
// State machine notification to ".receivedSubscription" will be handled in `receive(subscription:)` | ||
mutationEventPublisher.publisher.subscribe(self) | ||
}) | ||
} | ||
|
||
// MARK: - Event loop processing | ||
|
@@ -187,6 +189,8 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { | |
"[SyncMutationToCloudOperation] mutationEvent finished: \(mutationEvent.id); result: \(result)") | ||
self.processSyncMutationToCloudResult(result, mutationEvent: mutationEvent, api: api) | ||
} | ||
|
||
dispatchOutboxStatusEvent(isEmpty: false) | ||
operationQueue.addOperation(syncMutationToCloudOperation) | ||
stateMachine.notify(action: .enqueuedEvent) | ||
} | ||
|
@@ -241,10 +245,36 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior { | |
self.log.verbose("mutationEvent deleted successfully") | ||
} | ||
|
||
self.dispatchOutboxStatusEvent(isEmpty: true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you know the outbox is empty here? Shouldn't you be querying the mutation events from storage, or checking the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, it makes sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As i understand it, we were emitting outboxStatus events to this logic (this is probably wrong then)
example output There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or is the expected behavior just a ping of the outbox status? for example There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lawmicha the second behavior is the expected one. For reference, this is the JS implementation: It checks the queue |
||
self.stateMachine.notify(action: .processedEvent) | ||
} | ||
} | ||
|
||
private func queryMutationEventsFromStorage(onComplete: @escaping (() -> Void)) { | ||
let fields = MutationEvent.keys | ||
let predicate = fields.inProcess == false || fields.inProcess == nil | ||
|
||
storageAdapter.query(MutationEvent.self, | ||
predicate: predicate, | ||
sort: nil, | ||
paginationInput: nil) { result in | ||
switch result { | ||
case .success(let events): | ||
self.dispatchOutboxStatusEvent(isEmpty: events.isEmpty) | ||
case .failure(let error): | ||
log.error("Error quering mutation events: \(error)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/quering/querying/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
} | ||
onComplete() | ||
} | ||
} | ||
|
||
private func dispatchOutboxStatusEvent(isEmpty: Bool) { | ||
let outboxStatusEvent = OutboxStatusEvent(isEmpty: isEmpty) | ||
let outboxStatusEventPayload = HubPayload(eventName: HubPayload.EventName.DataStore.outboxStatus, | ||
data: outboxStatusEvent) | ||
Amplify.Hub.dispatch(to: .dataStore, payload: outboxStatusEventPayload) | ||
} | ||
|
||
} | ||
|
||
@available(iOS 13.0, *) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
// | ||
// Copyright 2018-2020 Amazon.com, | ||
// Inc. or its affiliates. All Rights Reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
import XCTest | ||
|
||
import AmplifyPlugins | ||
import AWSPluginsCore | ||
|
||
@testable import Amplify | ||
@testable import AmplifyTestCommon | ||
@testable import AWSDataStoreCategoryPlugin | ||
|
||
@available(iOS 13.0, *) | ||
class DataStoreHubEventTests: HubEventsIntegrationTestBase { | ||
|
||
/// - Given: | ||
/// - registered two models from `TestModelRegistration` | ||
/// - no pending MutationEvents in MutationEvent database | ||
/// - When: | ||
/// - DataStore's remote sync engine is initialized | ||
/// - Then: | ||
/// - subscriptionEstablished received, payload should be nil | ||
/// - syncQueriesStarted received, payload should be: {models: ["Post", "Comment"]} | ||
/// - outboxStatus received, payload should be {isEmpty: true} | ||
func testDataStoreConfiguredDispatchesHubEvents() throws { | ||
|
||
let subscriptionsEstablishedReceived = expectation(description: "subscriptionsEstablished received") | ||
let syncQueriesStartedReceived = expectation(description: "syncQueriesStarted received") | ||
let outboxStatusReceived = expectation(description: "outboxStatus received") | ||
|
||
let hubListener = Amplify.Hub.listen(to: .dataStore) { payload in | ||
if payload.eventName == HubPayload.EventName.DataStore.subscriptionsEstablished { | ||
XCTAssertNil(payload.data) | ||
subscriptionsEstablishedReceived.fulfill() | ||
} | ||
|
||
if payload.eventName == HubPayload.EventName.DataStore.syncQueriesStarted { | ||
guard let syncQueriesStartedEvent = payload.data as? SyncQueriesStartedEvent else { | ||
XCTFail("Failed to cast payload data as SyncQueriesStartedEvent") | ||
return | ||
} | ||
XCTAssertEqual(syncQueriesStartedEvent.models.count, 2) | ||
lawmicha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
syncQueriesStartedReceived.fulfill() | ||
} | ||
|
||
if payload.eventName == HubPayload.EventName.DataStore.outboxStatus { | ||
guard let outboxStatusEvent = payload.data as? OutboxStatusEvent else { | ||
XCTFail("Failed to cast payload data as OutboxStatusEvent") | ||
return | ||
} | ||
XCTAssertTrue(outboxStatusEvent.isEmpty) | ||
outboxStatusReceived.fulfill() | ||
} | ||
} | ||
|
||
guard try HubListenerTestUtilities.waitForListener(with: hubListener, timeout: 5.0) else { | ||
XCTFail("Listener not registered for hub") | ||
return | ||
} | ||
|
||
waitForExpectations(timeout: networkTimeout, handler: nil) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
// | ||
// Copyright 2018-2020 Amazon.com, | ||
// Inc. or its affiliates. All Rights Reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
|
||
import XCTest | ||
|
||
import AmplifyPlugins | ||
import AWSMobileClient | ||
|
||
@testable import Amplify | ||
@testable import AmplifyTestCommon | ||
@testable import AWSDataStoreCategoryPlugin | ||
|
||
class HubEventsIntegrationTestBase: XCTestCase { | ||
lawmicha marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
static let networkTimeout = TimeInterval(180) | ||
let networkTimeout = HubEventsIntegrationTestBase.networkTimeout | ||
|
||
override func setUp() { | ||
super.setUp() | ||
|
||
continueAfterFailure = false | ||
|
||
let bundle = Bundle(for: type(of: self)) | ||
guard let configFile = bundle.url(forResource: "amplifyconfiguration", withExtension: "json") else { | ||
XCTFail("Could not get URL for amplifyconfiguration.json from \(bundle)") | ||
return | ||
} | ||
|
||
do { | ||
let configData = try Data(contentsOf: configFile) | ||
let amplifyConfig = try JSONDecoder().decode(AmplifyConfiguration.self, from: configData) | ||
try Amplify.add(plugin: AWSAPIPlugin(modelRegistration: TestModelRegistration())) | ||
try Amplify.add(plugin: AWSDataStorePlugin(modelRegistration: TestModelRegistration())) | ||
try Amplify.configure(amplifyConfig) | ||
} catch { | ||
XCTFail(String(describing: error)) | ||
return | ||
} | ||
} | ||
|
||
override func tearDown() { | ||
sleep(1) | ||
print("Amplify reset") | ||
Amplify.reset() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify wording: "Dispatched when: 1) the DataStore starts; 2) each time a local mutation is enqueued into the outbox; 3) each time a local mutation is finished processing."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated