Skip to content

Commit

Permalink
Initial datastore clear() async implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
wooj2 committed Mar 14, 2020
1 parent 3bd47b9 commit c56df02
Show file tree
Hide file tree
Showing 21 changed files with 259 additions and 39 deletions.
4 changes: 4 additions & 0 deletions Amplify/Categories/DataStore/DataStoreCategory+Behavior.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ extension DataStoreCategory: DataStoreBaseBehavior {
completion: @escaping DataStoreCallback<Void>) {
plugin.delete(modelType, withId: id, completion: completion)
}

public func clear(completion: @escaping DataStoreCallback<Void>) {
plugin.clear(completion: completion)
}
}
2 changes: 2 additions & 0 deletions Amplify/Categories/DataStore/DataStoreCategoryBehavior.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public protocol DataStoreBaseBehavior {
func delete<M: Model>(_ modelType: M.Type,
withId id: String,
completion: @escaping DataStoreCallback<Void>)

func clear(completion: @escaping DataStoreCallback<Void>)
}

public protocol DataStoreSubscribeBehavior {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
public func save<M: Model>(_ model: M,
completion: @escaping DataStoreCallback<M>) {
log.verbose("Saving: \(model)")

reinitStorageEngineIfNeeded()
// TODO: Refactor this into a proper request/result where the result includes metadata like the derived
// mutation type
let modelExists: Bool
Expand Down Expand Up @@ -55,6 +55,7 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
public func query<M: Model>(_ modelType: M.Type,
byId id: String,
completion: DataStoreCallback<M?>) {
reinitStorageEngineIfNeeded()
let predicate: QueryPredicateFactory = { field("id") == id }
query(modelType, where: predicate) {
switch $0 {
Expand All @@ -74,6 +75,7 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
public func query<M: Model>(_ modelType: M.Type,
where predicateFactory: QueryPredicateFactory? = nil,
completion: DataStoreCallback<[M]>) {
reinitStorageEngineIfNeeded()
storageEngine.query(modelType,
predicate: predicateFactory?(),
completion: completion)
Expand All @@ -82,13 +84,15 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
public func delete<M: Model>(_ modelType: M.Type,
withId id: String,
completion: @escaping DataStoreCallback<Void>) {
reinitStorageEngineIfNeeded()
storageEngine.delete(modelType,
withId: id,
completion: completion)
}

public func delete<M: Model>(_ model: M,
completion: @escaping DataStoreCallback<Void>) {
reinitStorageEngineIfNeeded()
let publishingCompletion: DataStoreCallback<Void> = { result in
switch result {
case .success:
Expand All @@ -109,6 +113,7 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
public func delete<M: Model>(_ modelType: M.Type,
where predicate: @escaping QueryPredicateFactory,
completion: @escaping DataStoreCallback<Void>) {
reinitStorageEngineIfNeeded()
let onCompletion: DataStoreCallback<[M]> = { result in
switch result {
case .success(let models):
Expand All @@ -124,6 +129,41 @@ extension AWSDataStorePlugin: DataStoreBaseBehavior {
predicate: predicate(),
completion: onCompletion)
}

public func clear(completion: @escaping DataStoreCallback<Void>) {
if storageEngine == nil {
completion(.successfulVoid)
return
}
storageEngine.clear { result in
var sError: Error?

switch result {
case .success:
self.log.verbose("Successfully called clear() on storage engine")
case .failure(let err):
sError = err
self.log.error("Failed to clear() storage engine")
}

//If we failed to kill the storage engine, we should
// continue to attempt to kill the storage engine & publishers
self.storageEngine = nil
if #available(iOS 13.0, *) {
if let publisher = self.dataStorePublisher as? DataStorePublisher {
publisher.sendFinished()
}
}
self.dataStorePublisher = nil

if let error = sError {
completion(.failure(causedBy: error))
} else {
completion(.successfulVoid)
}
}
}

// MARK: Private

private func publishMutationEvent<M: Model>(from model: M,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extension AWSDataStorePlugin: DataStoreSubscribeBehavior {
@available(iOS 13.0, *)
public func publisher<M: Model>(for modelType: M.Type)
-> AnyPublisher<MutationEvent, DataStoreError> {
reinitStorageEngineIfNeeded()
// Force-unwrapping: The optional 'dataStorePublisher' is expected
// to exist for deployment targets >=iOS13.0
return dataStorePublisher!.publisher(for: modelType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
var isSyncEnabled: Bool

/// The Publisher that sends mutation events to subscribers
let dataStorePublisher: DataStoreSubscribeBehavior?
var dataStorePublisher: DataStoreSubscribeBehavior?

let modelRegistration: AmplifyModelRegistration

Expand Down Expand Up @@ -78,22 +78,38 @@ final public class AWSDataStorePlugin: DataStoreCategoryPlugin {
}
}

// MARK: Private
func resolveStorageEngine() throws {
guard storageEngine == nil else {
return
}

private func resolveSyncEnabled() {
storageEngine = try StorageEngine(isSyncEnabled: isSyncEnabled)
if #available(iOS 13.0, *) {
isSyncEnabled = ModelRegistry.hasSyncableModels
setupStorageSink()
}
}

private func resolveStorageEngine() throws {
guard storageEngine == nil else {
func reinitStorageEngineIfNeeded() {
if storageEngine != nil {
return
}
do {
if #available(iOS 13.0, *) {
self.dataStorePublisher = DataStorePublisher()
}
try resolveStorageEngine()
try storageEngine.setUp(models: ModelRegistry.models)
storageEngine.startSync()
} catch {
log.error(error: error)
}
}

storageEngine = try StorageEngine(isSyncEnabled: isSyncEnabled)
// MARK: Private

private func resolveSyncEnabled() {
if #available(iOS 13.0, *) {
setupStorageSink()
isSyncEnabled = ModelRegistry.hasSyncableModels
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import AWSPluginsCore
/// an integration layer between the AppSyncLocal `StorageEngine` and SQLite for local storage.
final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
internal var connection: Connection!
private var dbFilePath: URL?

convenience init(databaseName: String = "database") throws {
guard let documentsPath = getDocumentPath() else {
preconditionFailure("Could not create the database. The `.documentDirectory` is invalid")
}
let path = documentsPath.appendingPathComponent("\(databaseName).db").absoluteString
let dbFilePath = documentsPath.appendingPathComponent("\(databaseName).db")
let path = dbFilePath.absoluteString

let connection: Connection
do {
Expand All @@ -28,11 +30,12 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
throw DataStoreError.invalidDatabase(path: path, error)
}

try self.init(connection: connection)
try self.init(connection: connection, dbFilePath: dbFilePath)
}

internal init(connection: Connection) throws {
internal init(connection: Connection, dbFilePath: URL? = nil) throws {
self.connection = connection
self.dbFilePath = dbFilePath
try SQLiteStorageEngineAdapter.initializeDatabase(connection: connection)
log.verbose("Initialized \(connection)")
}
Expand Down Expand Up @@ -223,6 +226,23 @@ final class SQLiteStorageEngineAdapter: StorageEngineAdapter {
try transactionBlock()
}
}

func clear(completion: @escaping DataStoreCallback<Void>) {
guard let dbFilePath = dbFilePath else {
log.error("Attempt to clear DB, but file path was empty")
completion(.failure(causedBy: DataStoreError.invalidDatabase(path: "path not set", nil)))
return
}
connection = nil
let fileManager = FileManager.default
do {
try fileManager.removeItem(at: dbFilePath)
} catch {
log.error("Failed to delete database file located at: \(dbFilePath), error: \(error)")
completion(.failure(causedBy: DataStoreError.invalidDatabase(path: dbFilePath.absoluteString, error)))
}
completion(.successfulVoid)
}
}

// MARK: - Private Helpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ final class StorageEngine: StorageEngineBehavior {
syncEngine?.start(api: Amplify.API)
}

func clear(completion: @escaping DataStoreCallback<Void>) {
if let syncEngine = syncEngine {
syncEngine.stop(completion: { _ in
self.storageAdapter.clear(completion: completion)
})
} else {
storageAdapter.clear(completion: completion)
}
}

func reset(onComplete: () -> Void) {
// TOOD: Perform cleanup on StorageAdapter, including releasing its `Connection` if needed
let group = DispatchGroup()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ protocol StorageEngineAdapter: class, ModelStorageBehavior {
func queryModelSyncMetadata(for modelType: Model.Type) throws -> ModelSyncMetadata?

func transaction(_ basicClosure: BasicThrowableClosure) throws

func clear(completion: @escaping DataStoreCallback<Void>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ protocol StorageEngineBehavior: class, ModelStorageBehavior {

/// Tells the StorageEngine to begin syncing, if sync is enabled
func startSync()

func clear(completion: @escaping DataStoreCallback<Void>)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ final class AWSMutationEventPublisher: Publisher {

func cancel() {
subscription = nil
eventSource = nil
}

func request(_ demand: Subscribers.Demand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import Combine

/// Publishes mutation events to downstream subscribers for subsequent sync to the API.
@available(iOS 13.0, *)
protocol MutationEventPublisher: class {
protocol MutationEventPublisher: class, Cancellable {
var publisher: AnyPublisher<MutationEvent, DataStoreError> { get }
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ final class OutgoingMutationQueue: OutgoingMutationQueueBehavior {
let dataStoreError = DataStoreError.configuration(
"API is unexpectedly nil",
"""
The reference to storageAdapter has been released while an ongoing mutation was being processed.
The reference to api has been released while an ongoing mutation was being processed.
\(AmplifyErrorMessages.reportBugToAWS())
"""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ extension RemoteSyncEngine {
case activatedCloudSubscriptions(APICategoryGraphQLBehavior, MutationEventPublisher)
case activatedMutationQueue
case notifiedSyncStarted
case cleanedUp(AmplifyError?)
case scheduleRestart(AmplifyError?)
case cleanedUp(AmplifyError)
case cleanedUpForTermination
case scheduleRestart(AmplifyError)
case scheduleRestartFinished

// Terminal actions
Expand Down Expand Up @@ -52,6 +53,8 @@ extension RemoteSyncEngine {
return "notifiedSyncStarted"
case .cleanedUp:
return "cleanedUp"
case .cleanedUpForTermination:
return "cleanedUpForTermination"
case .scheduleRestart:
return "scheduleRestart"
case .scheduleRestartFinished:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,35 @@ extension RemoteSyncEngine {
return .performingInitialSync
case (.initializingSubscriptions, .errored(let error)):
return .cleaningUp(error)
case (.initializingSubscriptions, .finished):
return .cleaningUp(nil)


case (.performingInitialSync, .performedInitialSync):
return .activatingCloudSubscriptions
case (.performingInitialSync, .errored(let error)):
return .cleaningUp(error)
case (.performingInitialSync, .finished):
return .cleaningUp(nil)


case (.activatingCloudSubscriptions, .activatedCloudSubscriptions(let api, let mutationEventPublisher)):
return .activatingMutationQueue(api, mutationEventPublisher)
case (.activatingCloudSubscriptions, .errored(let error)):
return .cleaningUp(error)
case (.activatingCloudSubscriptions, .finished):
return .cleaningUp(nil)

case (.activatingMutationQueue, .activatedMutationQueue):
return .notifyingSyncStarted
case (.activatingMutationQueue, .errored(let error)):
return .cleaningUp(error)
case (.activatingMutationQueue, .finished):
return .cleaningUp(nil)

case (.notifyingSyncStarted, .notifiedSyncStarted):
return .syncEngineActive

case (.syncEngineActive, .errored(let error)):
return .cleaningUp(error)
case (.syncEngineActive, .finished):
return .cleaningUp(nil)

case (_, .finished):
return .cleaningUpForTermination

case (.cleaningUp, .cleanedUp(let error)):
return .schedulingRestart(error)
case (.cleaningUpForTermination, .cleanedUpForTermination):
return .terminate

case (.schedulingRestart, .scheduleRestartFinished):
return .pausingSubscriptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@ extension RemoteSyncEngine {
currentAttemptNumber = 1
}

func scheduleRestart(error: AmplifyError?) {
func scheduleRestartOrTerminate(error: AmplifyError) {
let advice = getRetryAdvice(error: error)
if advice.shouldRetry {
scheduleRestart(advice: advice)
} else {
if let error = error {
remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.api(error)))
} else {
remoteSyncTopicPublisher.send(completion: .finished)
remoteSyncTopicPublisher.send(completion: .failure(DataStoreError.api(error)))
if let completionBlock = finishedCompletionBlock {
completionBlock(.failure(causedBy: error))
finishedCompletionBlock = nil
}
}

}

private func getRetryAdvice(error: Error?) -> RequestRetryAdvice {
private func getRetryAdvice(error: Error) -> RequestRetryAdvice {
//TODO: Parse error from the receive completion to use as an input into getting retry advice.
// For now, specifying not connected to internet to force a retry up to our maximum
let urlError = URLError(.notConnectedToInternet)
Expand Down
Loading

0 comments on commit c56df02

Please sign in to comment.