Skip to content

Commit

Permalink
Merge pull request #183 from lerna-stack/snapshot-store-reply-with-sn…
Browse files Browse the repository at this point in the history
…apshot-not-found

SnapshotStore can reply with SnapshotNotFound if it is saving an EntitySnapshot
  • Loading branch information
negokaz authored Oct 28, 2022
2 parents da0458a + 5660bfc commit e7e0585
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snapshot synchronization could remove committed log entries that not be included in snapshots
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)
- SnapshotStore doesn't reply with SnapshotNotFound sometimes
[#182](https://github.com/lerna-stack/akka-entity-replication/issues/182),
[#PR183](https://github.com/lerna-stack/akka-entity-replication/pull/183)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ private[entityreplication] class SnapshotStore(
cmd.entityId,
)
case FetchSnapshot(_, replyTo) =>
prevSnapshot.foreach { s =>
replyTo ! SnapshotProtocol.SnapshotFound(s)
prevSnapshot match {
case Some(prevSnapshot) =>
replyTo ! SnapshotProtocol.SnapshotFound(prevSnapshot)
case None =>
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
}
}
case _: persistence.SaveSnapshotSuccess =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import lerna.akka.entityreplication.raft.{ ActorSpec, RaftSettings }
import lerna.akka.entityreplication.testkit.KryoSerializable

import scala.concurrent.Promise
import scala.util.Using

object ShardSnapshotStoreFailureSpec {
final case object DummyState extends KryoSerializable
Expand Down Expand Up @@ -93,34 +92,41 @@ class ShardSnapshotStoreFailureSpec
"ShardSnapshotStore (with time-consuming writes)" should {

// Emulates a time-consuming write
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType with AutoCloseable {
// Note:
// The promise (`processingResultPromise`) must be fulfilled.
// The succeeding tests will fail unless the promise is fulfilled.
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
val processingResultPromise = Promise[ProcessingResult]()
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
processingUnit match {
case _: WriteSnapshot => processingResultPromise.future.await
case _ => ProcessingSuccess
}
}
override def close(): Unit = {
def trySuccess(): Unit = {
processingResultPromise.trySuccess(ProcessingSuccess)
}
}

"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" ignore {
// TODO Change SnapshotStore.savingSnapshot such that this test passes.
"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" in {
val entityId = generateUniqueEntityId()
val shardSnapshotStore = createShardSnapshotStore()
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)

// Test:
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
expectMsg(SnapshotNotFound)
expectMsg(SnapshotNotFound(entityId))
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand All @@ -134,7 +140,8 @@ class ShardSnapshotStoreFailureSpec
shardSnapshotStore ! SaveSnapshot(firstSnapshot, replyTo = testActor)
expectMsg(SaveSnapshotSuccess(firstSnapshotMetadata))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the second snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
val secondSnapshot =
Expand All @@ -144,6 +151,10 @@ class ShardSnapshotStoreFailureSpec
// Test:
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
expectMsg(SnapshotFound(firstSnapshot))
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand All @@ -155,7 +166,8 @@ class ShardSnapshotStoreFailureSpec
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))

Using(new TimeConsumingWriteSnapshotPolicy()) { timeConsumingWriteSnapshotPolicy =>
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
try {
// Prepare: SnapshotStore is saving the snapshot
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
Expand All @@ -168,6 +180,10 @@ class ShardSnapshotStoreFailureSpec
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
}
expectNoMessage()
} finally {
// Cleanup:
// The succeeding tests will fail unless the promise is fulfilled.
timeConsumingWriteSnapshotPolicy.trySuccess()
}
}

Expand Down

0 comments on commit e7e0585

Please sign in to comment.