Skip to content

Commit

Permalink
RaftActor should recover by reading AppendedEntries_V2_1_0 for backwa…
Browse files Browse the repository at this point in the history
…rd compatibility
  • Loading branch information
Taichi Yamakawa committed Apr 27, 2022
1 parent 02ce35f commit 040c577
Showing 1 changed file with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,36 @@ package lerna.akka.entityreplication.raft
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ typed, ActorSystem }
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.testkit.{ TestKit, TestProbe }
import com.typesafe.config.ConfigFactory
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.RaftActor.Follower
import lerna.akka.entityreplication.raft.RaftActor.{ AppendedEntries_V2_1_0, Follower }
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands.InstallSnapshot
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._

import scala.annotation.nowarn

object RaftActorSpec {
final case object DummyEntityState
}

class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
class RaftActorSpec
extends TestKit(ActorSystem("RaftActorSpec", RaftActorSpecBase.configWithPersistenceTestKits))
with RaftActorSpecBase {
import RaftActorSpec._

private implicit val typedSystem: typed.ActorSystem[Nothing] = system.toTyped
private val persistenceTestKit = PersistenceTestKit(system)

override def beforeEach(): Unit = {
super.beforeEach()
persistenceTestKit.clearAll()
}

"RaftActor receiving FetchEntityEvents" should {
"reply FetchEntityEventsResponse" in {
Expand Down Expand Up @@ -546,4 +557,66 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
}

}

"RaftActor" should {

"recover by reading AppendedEntries_V2_1_0 for backward compatibility" in {
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()

val followerPersistenceId = raftActorPersistenceId(shardId = shardId, selfMemberIndex = followerMemberIndex)
persistenceTestKit.persistForRecovery(
followerPersistenceId,
Seq(
AppendedEntries_V2_1_0(
Term(1),
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId.from("1")), "event-1"), Term(1)),
),
LogEntryIndex(0),
): @nowarn,
AppendedEntries_V2_1_0(
Term(1),
Seq(
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId.from("1")), "event-2"), Term(1)),
),
LogEntryIndex(2),
): @nowarn,
AppendedEntries_V2_1_0(
Term(2),
Seq(
LogEntry(LogEntryIndex(3), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(4), EntityEvent(Some(NormalizedEntityId.from("2")), "event-1"), Term(2)),
LogEntry(LogEntryIndex(5), EntityEvent(Some(NormalizedEntityId.from("2")), "event-2"), Term(2)),
),
LogEntryIndex(2),
),
AppendedEntries_V2_1_0(
Term(2),
Seq(
LogEntry(LogEntryIndex(4), EntityEvent(Some(NormalizedEntityId.from("2")), "event-1"), Term(2)),
),
LogEntryIndex(3),
): @nowarn,
),
)
val expectedEntriesAfterRecover = Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId.from("1")), "event-1"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(None, NoOp), Term(2)),
LogEntry(LogEntryIndex(4), EntityEvent(Some(NormalizedEntityId.from("2")), "event-1"), Term(2)),
)

val follower = createRaftActor(
shardId = shardId,
selfMemberIndex = followerMemberIndex,
)
val recoveredLogEntries = getState(follower).stateData.replicatedLog.entries
recoveredLogEntries should contain theSameElementsInOrderAs expectedEntriesAfterRecover
recoveredLogEntries.map(_.event) should contain theSameElementsInOrderAs expectedEntriesAfterRecover.map(_.event)
}

}

}

0 comments on commit 040c577

Please sign in to comment.