Skip to content

Commit

Permalink
🐞fix: Entity recovers its state of when RaftActor sends Activate
Browse files Browse the repository at this point in the history
RaftActor can send TakeSnapshot message during entity recovery.
Index of the snapshot is determined by lastApplied at sending TakeSnapshot.
RaftActor can provide new events to the entity during entity recovery and this process increments lastApplied.
RecoveryState message to recovery the entity is created from lastApplied.

These will cause a mismatch of the index of the snapshot and recovered state of the entity.
For more details, see #111.

The Activate message allows RaftActor to restore the state at just RaftActor creates the entity.
  • Loading branch information
negokaz committed Dec 29, 2021
1 parent f9764e6 commit 0adb893
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 205 deletions.
117 changes: 74 additions & 43 deletions src/main/scala/lerna/akka/entityreplication/ReplicationActor.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
package lerna.akka.entityreplication

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ Actor, ActorRef, Cancellable, Props, Stash }
import akka.actor.typed.scaladsl.adapter._

import akka.actor.{ Actor, Cancellable, Stash }
import java.util.concurrent.atomic.AtomicInteger
import akka.event.Logging
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.RaftProtocol
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.model.{ LogEntryIndex, NoOp }
import lerna.akka.entityreplication.raft.protocol.SnapshotOffer
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse, SnapshotOffer }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._

private[entityreplication] object ReplicationActor {

private[this] val instanceIdCounter = new AtomicInteger(1)

private def generateInstanceId(): EntityInstanceId = EntityInstanceId(instanceIdCounter.getAndIncrement())

private object FetchSnapshotResponseMapper {
def props(replyTo: ActorRef, snapshot: Option[EntitySnapshot]): Props =
Props(new FetchEntityEventsResponseMapper(replyTo, snapshot))
}

private class FetchEntityEventsResponseMapper(replyTo: ActorRef, snapshot: Option[EntitySnapshot]) extends Actor {
override def receive: Receive = {
case FetchEntityEventsResponse(events) =>
replyTo ! RaftProtocol.RecoveryState(events, snapshot)
context.stop(self)
}
}
}

@deprecated(message = "Use typed.ReplicatedEntityBehavior instead", since = "2.0.0")
Expand All @@ -25,6 +41,8 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash

private val instanceId = ReplicationActor.generateInstanceId()

private val entityId = NormalizedEntityId.of(self.path)

private[this] val settings = ClusterReplicationSettings.create(context.system)

private[this] val log = Logging(context.system, this)
Expand All @@ -35,51 +53,64 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash
def stateReceive(receive: Receive, message: Any): Unit
}

override def aroundPreStart(): Unit = {
super.aroundPreStart()
requestRecovery()
private[this] val inactive: State = new State {
override def stateReceive(receive: Receive, message: Any): Unit =
message match {
case Activate(shardSnapshotStore, recoveryIndex) =>
changeState(recovering(shardSnapshotStore, recoveryIndex))
case _ =>
internalStash.stash()
}
}

override def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
super.aroundPreRestart(reason, message)
requestRecovery()
}
private[this] def recovering(shardSnapshotStore: ActorRef, recoveryIndex: LogEntryIndex): State =
new State {

private[this] def requestRecovery(): Unit = {
context.parent ! RequestRecovery(NormalizedEntityId.of(self.path))
}
private[this] val recoveryTimeoutTimer: Cancellable =
context.system.scheduler.scheduleOnce(settings.recoveryEntityTimeout, self, RecoveryTimeout)

private[this] val recovering: State = new State {
shardSnapshotStore ! SnapshotProtocol.FetchSnapshot(entityId, self)

private[this] val recoveryTimeoutTimer: Cancellable =
context.system.scheduler.scheduleOnce(settings.recoveryEntityTimeout, self, RecoveryTimeout)
override def stateReceive(receive: Receive, message: Any): Unit =
message match {
case RecoveryTimeout =>
// to restart
// TODO: BackoffSupervisor を使ってカスケード障害を回避する
if (log.isInfoEnabled)
log.info("Entity (name: {}) recovering timed out. It will be retried later.", self.path.name)
throw EntityRecoveryTimeoutException(self.path)

case found: SnapshotProtocol.SnapshotFound =>
fetchEntityEvents(snapshotIndex = found.snapshot.metadata.logEntryIndex, Option(found.snapshot))
case _: SnapshotProtocol.SnapshotNotFound =>
fetchEntityEvents(snapshotIndex = LogEntryIndex.initial(), None)

case RecoveryState(logEntries, maybeSnapshot) =>
recoveryTimeoutTimer.cancel()
maybeSnapshot.foreach { snapshot =>
innerApplyEvent(
SnapshotOffer(snapshot.state.underlying),
snapshot.metadata.logEntryIndex,
)
}
logEntries.foreach { logEntry =>
innerApplyEvent(logEntry.event.event, logEntry.index)
}
changeState(ready)
internalStash.unstashAll()
case _ =>
internalStash.stash()
}

override def stateReceive(receive: Receive, message: Any): Unit =
message match {
case RecoveryTimeout =>
// to restart
// TODO: BackoffSupervisor を使ってカスケード障害を回避する
if (log.isInfoEnabled)
log.info("Entity (name: {}) recovering timed out. It will be retried later.", self.path.name)
throw EntityRecoveryTimeoutException(self.path)

case RecoveryState(logEntries, maybeSnapshot) =>
recoveryTimeoutTimer.cancel()
maybeSnapshot.foreach { snapshot =>
innerApplyEvent(
SnapshotOffer(snapshot.state.underlying),
snapshot.metadata.logEntryIndex,
)
}
logEntries.foreach { logEntry =>
innerApplyEvent(logEntry.event.event, logEntry.index)
}
changeState(ready)
internalStash.unstashAll()
case _ =>
internalStash.stash()
def fetchEntityEvents(snapshotIndex: LogEntryIndex, snapshot: Option[EntitySnapshot]): Unit = {
context.parent ! FetchEntityEvents(
entityId,
from = snapshotIndex.next(),
to = recoveryIndex,
context.actorOf(ReplicationActor.FetchSnapshotResponseMapper.props(self, snapshot)),
)
}
}
}

private[this] val ready: State = new State {

Expand Down Expand Up @@ -132,7 +163,7 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash

def currentState: StateData

private[this] var replicationState: State = recovering
private[this] var replicationState: State = inactive
private[this] def changeState(state: State): Unit = replicationState = state

override def aroundReceive(receive: Receive, msg: Any): Unit =
Expand All @@ -143,7 +174,7 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash
context.parent ! Replicate(
event,
replyTo = self,
NormalizedEntityId.of(self.path),
entityId,
instanceId,
originSender = sender(),
)
Expand Down
36 changes: 17 additions & 19 deletions src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,23 @@ private[raft] trait Candidate { this: RaftActor =>
become(Candidate)
}

case request: RequestVote => receiveRequestVote(request)
case response: RequestVoteResponse => receiveRequestVoteResponse(response)
case request: AppendEntries => receiveAppendEntries(request)
case request: InstallSnapshot => receiveInstallSnapshot(request)
case _: InstallSnapshotResponse => // ignore, because I'm not a leader
case response: SnapshotSyncManager.Response => receiveSyncSnapshotResponse(response)
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case RequestRecovery(entityId) => recoveryEntity(entityId)
case response: SnapshotProtocol.FetchSnapshotResponse => receiveFetchSnapshotResponse(response)
case EntityTerminated(id) => receiveEntityTerminated(id)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
case request: RequestVote => receiveRequestVote(request)
case response: RequestVoteResponse => receiveRequestVoteResponse(response)
case request: AppendEntries => receiveAppendEntries(request)
case request: InstallSnapshot => receiveInstallSnapshot(request)
case _: InstallSnapshotResponse => // ignore, because I'm not a leader
case response: SnapshotSyncManager.Response => receiveSyncSnapshotResponse(response)
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
}

private[this] def receiveRequestVote(request: RequestVote): Unit =
Expand Down
34 changes: 16 additions & 18 deletions src/main/scala/lerna/akka/entityreplication/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,22 @@ private[raft] trait Follower { this: RaftActor =>
}
requestVote(currentData)

case request: RequestVote => receiveRequestVote(request)
case request: AppendEntries => receiveAppendEntries(request)
case request: InstallSnapshot => receiveInstallSnapshot(request)
case _: InstallSnapshotResponse => // ignore, because I'm not a leader
case response: SnapshotSyncManager.Response => receiveSyncSnapshotResponse(response)
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case RequestRecovery(entityId) => recoveryEntity(entityId)
case response: SnapshotProtocol.FetchSnapshotResponse => receiveFetchSnapshotResponse(response)
case EntityTerminated(id) => receiveEntityTerminated(id)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
case request: RequestVote => receiveRequestVote(request)
case request: AppendEntries => receiveAppendEntries(request)
case request: InstallSnapshot => receiveInstallSnapshot(request)
case _: InstallSnapshotResponse => // ignore, because I'm not a leader
case response: SnapshotSyncManager.Response => receiveSyncSnapshotResponse(response)
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
}

private[this] def receiveRequestVote(request: RequestVote): Unit =
Expand Down
2 changes: 0 additions & 2 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ private[raft] trait Leader { this: RaftActor =>
case ReplicationRegion.Passivate(entityPath, stopMessage) => startEntityPassivationProcess(entityPath, stopMessage)
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case RequestRecovery(entityId) => recoveryEntity(entityId)
case response: SnapshotProtocol.FetchSnapshotResponse => receiveFetchSnapshotResponse(response)
case EntityTerminated(id) => receiveEntityTerminated(id)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
Expand Down
23 changes: 4 additions & 19 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,23 +140,6 @@ private[raft] class RaftActor(
request.replyTo ! FetchEntityEventsResponse(logEntries)
}

protected[akka] def recoveryEntity(entityId: NormalizedEntityId): Unit = {
shardSnapshotStore ! SnapshotProtocol.FetchSnapshot(entityId, replyTo = self)
}

protected[this] def receiveFetchSnapshotResponse(response: SnapshotProtocol.FetchSnapshotResponse): Unit =
response match {
case SnapshotProtocol.SnapshotFound(snapshot) =>
val alreadyAppliedEntries = currentData.selectAlreadyAppliedEntries(
snapshot.metadata.entityId,
from = snapshot.metadata.logEntryIndex.next(),
)
replicationActor(snapshot.metadata.entityId) ! RecoveryState(alreadyAppliedEntries, Option(snapshot))
case SnapshotProtocol.SnapshotNotFound(entityId) =>
val alreadyAppliedEntries = currentData.selectAlreadyAppliedEntries(entityId)
replicationActor(entityId) ! RecoveryState(alreadyAppliedEntries, None)
}

protected[this] def replicationActor(entityId: NormalizedEntityId): ActorRef = {
context.child(entityId.underlying).getOrElse {
if (log.isDebugEnabled)
Expand All @@ -165,8 +148,10 @@ private[raft] class RaftActor(
currentState,
entityId,
)
val props = replicationActorProps(new ReplicationActorContext(entityId.raw, self))
context.watchWith(context.actorOf(props, entityId.underlying), EntityTerminated(entityId))
val props = replicationActorProps(new ReplicationActorContext(entityId.raw, self))
val entity = context.watchWith(context.actorOf(props, entityId.underlying), EntityTerminated(entityId))
entity ! Activate(shardSnapshotStore, recoveryIndex = currentData.lastApplied)
entity
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand
private[entityreplication] object RaftProtocol {

sealed trait RaftActorCommand extends ShardCommand
final case class RequestRecovery(entityId: NormalizedEntityId) extends RaftActorCommand
final case class Command(command: Any) extends RaftActorCommand with ClusterReplicationSerializable
final case class ForwardedCommand(command: Command) extends RaftActorCommand with ClusterReplicationSerializable
final case class Snapshot(metadata: EntitySnapshotMetadata, state: EntityState) extends RaftActorCommand
Expand Down Expand Up @@ -46,6 +45,8 @@ private[entityreplication] object RaftProtocol {

sealed trait EntityCommand

final case class Activate(shardSnapshotStore: ActorRef, recoveryIndex: LogEntryIndex) extends EntityCommand
final case class ApplySnapshot(entitySnapshot: Option[EntitySnapshot]) extends EntityCommand
final case class RecoveryState(events: Seq[LogEntry], snapshot: Option[EntitySnapshot]) extends EntityCommand
final case class ProcessCommand(command: Any) extends EntityCommand
final case class Replica(logEntry: LogEntry) extends EntityCommand
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package lerna.akka.entityreplication.raft.protocol

import akka.actor.typed.ActorRef
import lerna.akka.entityreplication.model.{ NormalizedEntityId, NormalizedShardId }
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.model.{ LogEntry, LogEntryIndex }
import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand

private[entityreplication] final case class FetchEntityEvents(
entityId: NormalizedEntityId,
from: LogEntryIndex,
to: LogEntryIndex,
replyTo: ActorRef[FetchEntityEventsResponse],
)
) extends ShardCommand

private[entityreplication] final case class FetchEntityEventsResponse(events: Seq[LogEntry])
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@ package lerna.akka.entityreplication.testkit
import akka.actor.{ Actor, Props, Terminated }
import lerna.akka.entityreplication.ReplicationRegion.Passivate
import lerna.akka.entityreplication.raft.model.LogEntryIndex
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol

protected[testkit] class TestReplicationActor(replicationActorProps: Props) extends Actor {
import lerna.akka.entityreplication.raft.RaftProtocol._

private[this] val replicationActor = context.watch(context.actorOf(replicationActorProps))

replicationActor ! Activate(self, recoveryIndex = LogEntryIndex.initial().next())

override def receive: Receive = active(LogEntryIndex(1))

def active(dummyLogEntryIndex: LogEntryIndex): Receive = {
case _: RequestRecovery =>
sender() ! RecoveryState(events = Seq(), snapshot = None)
case fetchSnapshot: SnapshotProtocol.FetchSnapshot =>
fetchSnapshot.replyTo ! SnapshotProtocol.SnapshotNotFound(fetchSnapshot.entityId)
case fetchEvents: FetchEntityEvents =>
fetchEvents.replyTo ! FetchEntityEventsResponse(Seq())
case replicate: Replicate =>
val sender = replicate.originSender.getOrElse(self)
replicate.replyTo.tell(ReplicationSucceeded(replicate.event, dummyLogEntryIndex, replicate.instanceId), sender)
Expand Down
Loading

0 comments on commit 0adb893

Please sign in to comment.