Skip to content

Commit

Permalink
Merge pull request #162 from lerna-stack/notify-replication-failure-t…
Browse files Browse the repository at this point in the history
…o-conflict-clients

Notify replication failure to conflict clients
  • Loading branch information
negokaz authored Jul 26, 2022
2 parents 695c697 + ee34723 commit e415a69
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Leader cannot reply to an entity with a `ReplicationFailed` message in some cases
[#153](https://github.com/lerna-stack/akka-entity-replication/issues/153),
[PR#161](https://github.com/lerna-stack/akka-entity-replication/pull/161)
- An entity could stick at WaitForReplication when a Raft log entry is truncated by conflict
[#155](https://github.com/lerna-stack/akka-entity-replication/issues/155),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)
- A RaftAcotor(Leader) could mis-deliver a ReplicationSucceeded message to a different entity
[156](https://github.com/lerna-stack/akka-entity-replication/issues/156),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
23 changes: 17 additions & 6 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,24 @@ private[raft] class RaftActor(
currentData.vote(candidate, term)
case DetectedNewTerm(term) =>
currentData.syncTerm(term)
case AppendedEntries(term, logEntries) =>
case AppendedEntries(term, newLogEntries) =>
currentData
.syncTerm(term)
.truncateAndAppendEntries(logEntries)
.discardConflictClients(
possiblyConflictIndex = newLogEntries.headOption.map(_.index),
conflictClient => {
if (log.isDebugEnabled) {
log.debug(
"[{}] sending ReplicationFailed to [{}], including sender [{}]",
currentState,
conflictClient.ref,
conflictClient.originSender,
)
}
conflictClient.forward(ReplicationFailed)
},
)
.truncateAndAppendEntries(newLogEntries)
case AppendedEntries_V2_1_0(term, logEntries, prevLogIndex) =>
currentData
.syncTerm(term)
Expand Down Expand Up @@ -276,10 +290,7 @@ private[raft] class RaftActor(
case (logEntry, Some(client)) =>
if (log.isDebugEnabled)
log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
client.ref.tell(
ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId),
client.originSender.getOrElse(ActorRef.noSender),
)
client.forward(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId))
case (logEntry, None) =>
// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
applyToReplicationActor(logEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,56 @@ private[entityreplication] trait LeaderData { self: RaftMemberData =>
.updateLeaderVolatileState(clients = clients -- applicableLogEntries.map(_.index)) // 通知したクライアントは削除してメモリを節約
}

/** Discards conflict clients and calls the given `onDiscard` handler with each conflict client
*
* A client is considered a conflict if an index associated with the client is conflict one.
* [[ReplicatedLog.findConflict]] describes what a conflict index means.
* Since this method doesn't find a conflict index, the caller of this method has to find a conflict index using
* [[RaftMemberData.resolveNewLogEntries]] or [[ReplicatedLog.findConflict]].
*
* If `possiblyConflictIndex` is None, this method considers no conflict.
*
* While `possiblyConflictIndex` should always be less than or equal to [[ReplicatedLog.lastLogIndex]] + 1, this method
* considers no conflict if `possiblyConflictIndex` is greater than [[ReplicatedLog.lastLogIndex]].
*
* @see
* - [[RaftMemberData.resolveNewLogEntries]]
* - [[ReplicatedLog.findConflict]]
*/
def discardConflictClients(
possiblyConflictIndex: Option[LogEntryIndex],
onDiscard: ClientContext => Unit,
): RaftMemberData = {
val conflictIndices = possiblyConflictIndex match {
case None => Seq.empty
case Some(possiblyConflictIndex) =>
if (possiblyConflictIndex >= replicatedLog.lastLogIndex.plus(1)) {
// No conflict log entries
Seq.empty
} else {
replicatedLog
.sliceEntries(
from = possiblyConflictIndex,
to = replicatedLog.lastLogIndex,
)
.map(_.index)
}
}
val conflictClientIndices = conflictIndices.filter(clients.contains)
if (conflictClientIndices.isEmpty) {
// No conflict clients
this
} else {
if (log.isInfoEnabled) {
log.info("Found [{}] conflict clients, discarding these clients", conflictClientIndices.size)
}
conflictClientIndices.foreach { conflictIndex =>
onDiscard(clients(conflictIndex))
}
updateLeaderVolatileState(clients = clients.removedAll(conflictClientIndices))
}
}

protected def updateLeaderVolatileState(
nextIndex: Option[NextIndex] = nextIndex,
matchIndex: MatchIndex = matchIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,14 @@ private[entityreplication] final case class ClientContext(
ref: ActorRef,
instanceId: Option[EntityInstanceId],
originSender: Option[ActorRef],
)
) {

/** Sends the given `message` to the actor `ref`, including the sender `originSender`
*
* If `originSender` is `None`, [[ActorRef.noSender]] is included as the sender.
*/
def forward(message: Any): Unit = {
ref.tell(message, originSender.getOrElse(ActorRef.noSender))
}

}
10 changes: 10 additions & 0 deletions src/test/scala/lerna/akka/entityreplication/raft/ActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,20 @@ trait ActorSpec extends WordSpecLike with Matchers with BeforeAndAfterEach with
override def beforeEach(): Unit = {
super.beforeEach()
(autoKillManager ? Identify("to wait for start-up")).await

// Ignoring all messages sent in the previous unit test case
ignoreAllMessagesSentBefore()
}

override def afterEach(): Unit = {
(autoKillManager ? TestActorAutoKillManager.KillAll).await
super.afterEach()
}

private def ignoreAllMessagesSentBefore(): Unit = {
case object SentinelMessage
testActor.tell(SentinelMessage, ActorRef.noSender)
fishForMessage(hint = "ignoring all messages sent before")(_ == SentinelMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import akka.actor.{ typed, ActorSystem }
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.testkit.{ TestKit, TestProbe }
import com.typesafe.config.ConfigFactory
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.RaftActor.{ AppendedEntries_V2_1_0, Follower }
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands.InstallSnapshot
import lerna.akka.entityreplication.raft.protocol.RaftCommands.{ AppendEntriesSucceeded, InstallSnapshot }
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._
import org.scalatest.Inside

import scala.annotation.nowarn

Expand All @@ -23,7 +24,8 @@ object RaftActorSpec {

class RaftActorSpec
extends TestKit(ActorSystem("RaftActorSpec", RaftActorSpecBase.configWithPersistenceTestKits))
with RaftActorSpecBase {
with RaftActorSpecBase
with Inside {
import RaftActorSpec._

private implicit val typedSystem: typed.ActorSystem[Nothing] = system.toTyped
Expand Down Expand Up @@ -617,6 +619,62 @@ class RaftActorSpec
recoveredLogEntries.map(_.event) should contain theSameElementsInOrderAs expectedEntriesAfterRecover.map(_.event)
}

"send ReplicationFailed messages to conflict clients and discard the clients " +
"if the received AppendEntries message contains conflict entries" in {
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
shardId = shardId,
selfMemberIndex = followerMemberIndex,
)
val clientProbe1 = TestProbe()
val clientProbe2 = TestProbe()
val client1 = ClientContext(clientProbe1.ref, Some(EntityInstanceId(1)), Some(TestProbe().ref))
val client2 = ClientContext(clientProbe2.ref, Some(EntityInstanceId(2)), None)
val followerData = {
val replicatedLog = ReplicatedLog().truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
LogEntry(LogEntryIndex(4), EntityEvent(None, NoOp), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = replicatedLog, commitIndex = LogEntryIndex(1))
.registerClient(client1, LogEntryIndex(2))
.registerClient(client2, LogEntryIndex(3))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val appendEntries = createAppendEntries(
shardId,
Term(3),
leaderMemberIndex,
prevLogIndex = LogEntryIndex(1),
prevLogTerm = Term(1),
entries = Seq(
LogEntry(LogEntryIndex(2), EntityEvent(None, NoOp), Term(3)),
),
)
val regionProbe = TestProbe()
follower.tell(appendEntries, regionProbe.ref)
regionProbe.expectMsg(AppendEntriesSucceeded(Term(3), LogEntryIndex(2), followerMemberIndex))

// Verifies that the RaftActor discards conflict clients.
inside(getState(follower).stateData) { newFollowerData =>
assert(!newFollowerData.clients.contains(LogEntryIndex(2)))
assert(!newFollowerData.clients.contains(LogEntryIndex(3)))
}

// Verifies that each conflict client receives a ReplicationFailed message
clientProbe1.expectMsg(ReplicationFailed)
assert(clientProbe1.sender() == client1.originSender.get) // including the original sender
clientProbe2.expectMsg(ReplicationFailed)
assert(clientProbe2.sender() == system.deadLetters) // including no original sender

}

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.model.NormalizedEntityId
import akka.actor.ActorRef
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.model._
import org.scalatest.{ FlatSpec, Inside, Matchers }

Expand Down Expand Up @@ -773,6 +774,97 @@ final class RaftMemberDataSpec extends FlatSpec with Matchers with Inside {
)
}

behavior of "RaftMemberData.discardConflictClients"

it should "discard no existing clients and call no onDiscard handler if the given index is None" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(None, conflictClient => discardedClients += conflictClient)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard no existing clients and call no onDiscard handler if the given index is greater than the last log index plus one" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(5)),
conflictClient => discardedClients += conflictClient,
)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard no existing clients and call no onDiscard handler if the given index is equal to the last log index plus one" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(4)),
conflictClient => discardedClients += conflictClient,
)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard conflict clients and call onDiscard handler with each conflict clients if conflict clients exist" in {
val client1 = ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None)
val client2 = ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None)
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(client1, LogEntryIndex(2))
.registerClient(client2, LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(2)),
conflictClient => discardedClients += conflictClient,
)

assert(!newData.clients.contains(LogEntryIndex(2)))
assert(!newData.clients.contains(LogEntryIndex(3)))
assert(discardedClients.contains(client1))
assert(discardedClients.contains(client2))
}

private def generateEntityId() = {
NormalizedEntityId.from(UUID.randomUUID().toString)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package lerna.akka.entityreplication.raft.model

import akka.actor.ActorSystem
import akka.testkit.{ TestKit, TestProbe }
import lerna.akka.entityreplication.raft.ActorSpec

final class ClientContextSpec extends TestKit(ActorSystem("ClientContextSpec")) with ActorSpec {

"ClientContext.forward should send the given message to the actor, including no sender, if the context doesn't have an original sender" in {
val probe = TestProbe()
val clientContext = ClientContext(probe.ref, instanceId = None, originSender = None)
clientContext.forward("message-1")
probe.expectMsg("message-1")
probe.sender() should be(system.deadLetters)
}

"ClientContext.forward should send the given message to the actor, including an original sender, if the context has the original sender" in {
val probe = TestProbe()
val originalSender = TestProbe().ref
val clientContext = ClientContext(probe.ref, instanceId = None, originSender = Some(originalSender))
clientContext.forward("message-1")
probe.expectMsg("message-1")
probe.sender() should be(originalSender)
}

}

0 comments on commit e415a69

Please sign in to comment.