Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RaftActor might delete committed entries #152

Closed
xirc opened this issue Apr 27, 2022 · 3 comments · Fixed by #151
Closed

RaftActor might delete committed entries #152

xirc opened this issue Apr 27, 2022 · 3 comments · Fixed by #151
Labels
bug Something isn't working risk Bad things that haven't occurred yet
Milestone

Comments

@xirc
Copy link
Contributor

xirc commented Apr 27, 2022

The following errors happened when some failure-injection tests (like one AZ down):

ERROR   akka.actor.OneForOneStrategy    xxx-akka.actor.internal-dispatcher-4  akka://xxx/system/sharding/raft-shard-xxx-replica-group-2/1/1 -       Term not found at lastApplied: 7317     java.lang.IllegalStateException: Term not found at lastApplied: 7317
    at lerna.akka.entityreplication.raft.RaftMemberData.resolveSnapshotTargets(RaftMemberData.scala:416)
    at lerna.akka.entityreplication.raft.RaftMemberData.resolveSnapshotTargets$(RaftMemberData.scala:405)
    at lerna.akka.entityreplication.raft.RaftMemberDataImpl.resolveSnapshotTargets(RaftMemberData.scala:539)
    at lerna.akka.entityreplication.raft.RaftActor.handleSnapshotTick(RaftActor.scala:496)
    at lerna.akka.entityreplication.raft.Follower$$anonfun$followerBehavior$1.applyOrElse(Follower.scala:35)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at lerna.akka.entityreplication.raft.RaftActor.akka$persistence$Eventsourced$$super$aroundReceive(RaftActor.scala:124)
    at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:923)
    at akka.persistence.Eventsourced.aroundReceive$$original(Eventsourced.scala:251)
    at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:148)
    at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:250)
    at lerna.akka.entityreplication.raft.RaftActor.aroundReceive(RaftActor.scala:124)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke$$original(ActorCell.scala:548)
    at akka.actor.ActorCell.invoke(ActorCell.scala:61)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run$$original(Mailbox.scala:231)
    at akka.dispatch.Mailbox.run(Mailbox.scala:32)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
ERROR   akka.actor.OneForOneStrategy    xxx-akka.actor.internal-dispatcher-14 akka://xxx/system/sharding/raft-shard-xxx-replica-group-3/2/2      -       requirement failed: Cannot select the entries (1-7610) unless RaftActor have applied the entries to the entities (lastApplied: 0)       java.lang.IllegalArgumentException: requirement failed: Cannot select the entries (1-7610) unless RaftActor have applied the entries to the entities (lastApplied: 0)
    at scala.Predef$.require(Predef.scala:337)
    at lerna.akka.entityreplication.raft.RaftMemberData.selectEntityEntries(RaftMemberData.scala:375)
    at lerna.akka.entityreplication.raft.RaftMemberData.selectEntityEntries$(RaftMemberData.scala:368)
    at lerna.akka.entityreplication.raft.RaftMemberDataImpl.selectEntityEntries(RaftMemberData.scala:539)
    at lerna.akka.entityreplication.raft.RaftActor.receiveFetchEntityEvents(RaftActor.scala:164)
    at lerna.akka.entityreplication.raft.Follower$$anonfun$followerBehavior$1.applyOrElse(Follower.scala:32)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at lerna.akka.entityreplication.raft.RaftActor.akka$persistence$Eventsourced$$super$aroundReceive(RaftActor.scala:124)
    at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:923)
    at akka.persistence.Eventsourced.aroundReceive$$original(Eventsourced.scala:251)
    at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:148)
    at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:250)
    at lerna.akka.entityreplication.raft.RaftActor.aroundReceive(RaftActor.scala:124)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke$$original(ActorCell.scala:548)
    at akka.actor.ActorCell.invoke(ActorCell.scala:61)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run$$original(Mailbox.scala:231)
    at akka.dispatch.Mailbox.run(Mailbox.scala:32)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
@xirc xirc added bug Something isn't working risk Bad things that haven't occurred yet labels Apr 27, 2022
@xirc xirc added this to the v2.1.1 milestone Apr 27, 2022
@xirc
Copy link
Contributor Author

xirc commented Apr 27, 2022

Related codes are the following:

def selectEntityEntries(
entityId: NormalizedEntityId,
from: LogEntryIndex,
to: LogEntryIndex,
): Seq[LogEntry] = {
require(
to <= lastApplied,
s"Cannot select the entries (${from}-${to}) unless RaftActor have applied the entries to the entities (lastApplied: ${lastApplied})",
)
replicatedLog.sliceEntries(from, to).filter(_.event.entityId.contains(entityId))
}

def resolveSnapshotTargets(): (Term, LogEntryIndex, Set[NormalizedEntityId]) = {
replicatedLog.termAt(lastApplied) match {
case Some(lastAppliedTerm) =>
val entityIds =
replicatedLog
.sliceEntries(lastSnapshotStatus.snapshotLastLogIndex.next(), lastApplied)
.flatMap(_.event.entityId.toSeq)
.toSet
(lastAppliedTerm, lastApplied, entityIds)
case None =>
// This exception is not thrown unless there is a bug
throw new IllegalStateException(s"Term not found at lastApplied: $lastApplied")
}
}

@xirc
Copy link
Contributor Author

xirc commented Apr 27, 2022

We've added some debug logs and diagnosed one RaftActor state change.

xxx:36:20.707 === [Follower] Applying event [FollowedLeaderCommit]. state diff: [commitIndex: 6955 -> 6966, lastApplied: 6955 -> 6966] ===
xxx:36:20.781 === [Follower] Applying event [AppendedEntries]. state diff: [replicatedLog: ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1099 entries with indices Some(5825)...Some(6923)) -> ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1115 entries with indices Some(5825)...Some(6939))] ===
xxx:36:20.781 === [Follower] Applying event [FollowedLeaderCommit]. state diff: - ===
xxx:36:20.781 === [Follower] Applying event [BecameFollower]. state diff: - ===
xxx:36:20.795 === [Follower] Applying event [AppendedEntries]. state diff: [replicatedLog: ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1115 entries with indices Some(5825)...Some(6939)) -> ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1131 entries with indices Some(5825)...Some(6955))] ===
xxx:36:20.795 === [Follower] Applying event [FollowedLeaderCommit]. state diff: - ===
xxx:36:20.795 === [Follower] Applying event [BecameFollower]. state diff: - ===
xxx:36:20.802 === [Follower] Applying event [AppendedEntries]. state diff: [replicatedLog: ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1131 entries with indices Some(5825)...Some(6955)) -> ReplicatedLog(ancestorTerm=Term(2), ancestorIndex=5824, 1019 entries with indices Some(5825)...Some(6843))] ===

As the above logs, RaftActor deleted its log entries (including committed entries).

@xirc
Copy link
Contributor Author

xirc commented Apr 27, 2022

RaftActor saves entire AppendEntries.entries as AppendedEntries event (not touched, which might truncate existing (committed) entries).

Instead, RaftActor should

  • truncate existing entries only if the received entries contain conflict with existing ones
  • truncate no entries if the received entries only contain existing ones

or AppendedEntries event replay should handle such things.

case appendEntries: AppendEntries =>
if (currentData.hasMatchLogEntry(appendEntries.prevLogIndex, appendEntries.prevLogTerm)) {
if (log.isDebugEnabled) log.debug("=== [Follower] append {} ===", appendEntries)
cancelElectionTimeoutTimer()
if (appendEntries.entries.isEmpty && appendEntries.term == currentData.currentTerm) {
// do not persist event when no need
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
appendEntries.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
} else {
applyDomainEvent(AppendedEntries(appendEntries.term, appendEntries.entries, appendEntries.prevLogIndex)) {
domainEvent =>
applyDomainEvent(FollowedLeaderCommit(appendEntries.leader, appendEntries.leaderCommit)) { _ =>
sender() ! AppendEntriesSucceeded(
domainEvent.term,
currentData.replicatedLog.lastLogIndex,
selfMemberIndex,
)
become(Follower)
}
}
}

def hasMatchLogEntry(prevLogIndex: LogEntryIndex, prevLogTerm: Term): Boolean = {
// リーダーにログが無い場合は LogEntryIndex.initial が送られてくる。
// そのケースでは AppendEntries が成功したとみなしたいので、
// prevLogIndex が LogEntryIndex.initial の場合はマッチするログが存在するとみなす
prevLogIndex == LogEntryIndex.initial() || replicatedLog.termAt(prevLogIndex).contains(prevLogTerm)
}

case AppendedEntries(term, logEntries, prevLogIndex) =>
currentData
.syncTerm(term)
.appendEntries(logEntries, prevLogIndex)

def appendEntries(logEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex): RaftMemberData = {
updatePersistentState(
replicatedLog = replicatedLog.merge(logEntries, prevLogIndex),
)
}

def merge(thatEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex): ReplicatedLog = {
val newEntries = this.entries.takeWhile(_.index <= prevLogIndex) ++ thatEntries
copy(newEntries)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working risk Bad things that haven't occurred yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant