Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SnapshotStore can reply with SnapshotNotFound if it is saving an EntitySnapshot #183

Merged
merged 3 commits into from
Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snapshot synchronization could remove committed log entries that not be included in snapshots
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)
- SnapshotStore doesn't reply with SnapshotNotFound sometimes
[#182](https://github.com/lerna-stack/akka-entity-replication/issues/182),
[#PR183](https://github.com/lerna-stack/akka-entity-replication/pull/183)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ private[entityreplication] class SnapshotStore(
cmd.entityId,
)
case FetchSnapshot(_, replyTo) =>
prevSnapshot.foreach { s =>
replyTo ! SnapshotProtocol.SnapshotFound(s)
prevSnapshot match {
case Some(prevSnapshot) =>
replyTo ! SnapshotProtocol.SnapshotFound(prevSnapshot)
case None =>
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
}
}
case _: persistence.SaveSnapshotSuccess =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import lerna.akka.entityreplication.raft.{ ActorSpec, RaftSettings }
import lerna.akka.entityreplication.testkit.KryoSerializable

import scala.concurrent.Promise
import scala.util.Using

object ShardSnapshotStoreFailureSpec {
final case object DummyState extends KryoSerializable
Expand Down Expand Up @@ -93,34 +92,41 @@ class ShardSnapshotStoreFailureSpec
"ShardSnapshotStore (with time-consuming writes)" should {

// Emulates a time-consuming write
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType with AutoCloseable {
// Note:
// The promise (`processingResultPromise`) must be fulfilled.
// The succeeding tests will fail unless the promise is fulfilled.
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
val processingResultPromise = Promise[ProcessingResult]()
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
processingUnit match {
case _: WriteSnapshot => processingResultPromise.future.await
case _ => ProcessingSuccess
}
}
override def close(): Unit = {
def trySuccess(): Unit = {
processingResultPromise.trySuccess(ProcessingSuccess)
}
}

"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" ignore {
// TODO Change SnapshotStore.savingSnapshot such that this test passes.
"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" in {
val entityId = generateUniqueEntityId()
val shardSnapshotStore = createShardSnapshotStore()
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)

// Test:
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
expectMsg(SnapshotNotFound)
expectMsg(SnapshotNotFound(entityId))
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand All @@ -134,7 +140,8 @@ class ShardSnapshotStoreFailureSpec
shardSnapshotStore ! SaveSnapshot(firstSnapshot, replyTo = testActor)
expectMsg(SaveSnapshotSuccess(firstSnapshotMetadata))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the second snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
val secondSnapshot =
Expand All @@ -144,6 +151,10 @@ class ShardSnapshotStoreFailureSpec
// Test:
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
expectMsg(SnapshotFound(firstSnapshot))
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand All @@ -155,7 +166,8 @@ class ShardSnapshotStoreFailureSpec
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
Expand All @@ -168,6 +180,10 @@ class ShardSnapshotStoreFailureSpec
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
}
expectNoMessage()
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand Down