Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify replication failure to conflict clients #162

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Leader cannot reply to an entity with a `ReplicationFailed` message in some cases
[#153](https://github.com/lerna-stack/akka-entity-replication/issues/153),
[PR#161](https://github.com/lerna-stack/akka-entity-replication/pull/161)
- An entity could stick at WaitForReplication when a Raft log entry is truncated by conflict
[#155](https://github.com/lerna-stack/akka-entity-replication/issues/155),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)
- A RaftAcotor(Leader) could mis-deliver a ReplicationSucceeded message to a different entity
[156](https://github.com/lerna-stack/akka-entity-replication/issues/156),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
23 changes: 17 additions & 6 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,24 @@ private[raft] class RaftActor(
currentData.vote(candidate, term)
case DetectedNewTerm(term) =>
currentData.syncTerm(term)
case AppendedEntries(term, logEntries) =>
case AppendedEntries(term, newLogEntries) =>
currentData
.syncTerm(term)
.truncateAndAppendEntries(logEntries)
.discardConflictClients(
possiblyConflictIndex = newLogEntries.headOption.map(_.index),
conflictClient => {
if (log.isDebugEnabled) {
log.debug(
"[{}] sending ReplicationFailed to [{}], including sender [{}]",
currentState,
conflictClient.ref,
conflictClient.originSender,
)
}
conflictClient.forward(ReplicationFailed)
},
)
.truncateAndAppendEntries(newLogEntries)
case AppendedEntries_V2_1_0(term, logEntries, prevLogIndex) =>
currentData
.syncTerm(term)
Expand Down Expand Up @@ -276,10 +290,7 @@ private[raft] class RaftActor(
case (logEntry, Some(client)) =>
if (log.isDebugEnabled)
log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
client.ref.tell(
ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId),
client.originSender.getOrElse(ActorRef.noSender),
)
client.forward(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId))
case (logEntry, None) =>
// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
applyToReplicationActor(logEntry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,56 @@ private[entityreplication] trait LeaderData { self: RaftMemberData =>
.updateLeaderVolatileState(clients = clients -- applicableLogEntries.map(_.index)) // 通知したクライアントは削除してメモリを節約
}

/** Discards conflict clients and calls the given `onDiscard` handler with each conflict client
*
* A client is considered a conflict if an index associated with the client is conflict one.
* [[ReplicatedLog.findConflict]] describes what a conflict index means.
* Since this method doesn't find a conflict index, the caller of this method has to find a conflict index using
* [[RaftMemberData.resolveNewLogEntries]] or [[ReplicatedLog.findConflict]].
*
* If `possiblyConflictIndex` is None, this method considers no conflict.
*
* While `possiblyConflictIndex` should always be less than or equal to [[ReplicatedLog.lastLogIndex]] + 1, this method
* considers no conflict if `possiblyConflictIndex` is greater than [[ReplicatedLog.lastLogIndex]].
*
* @see
* - [[RaftMemberData.resolveNewLogEntries]]
* - [[ReplicatedLog.findConflict]]
*/
def discardConflictClients(
possiblyConflictIndex: Option[LogEntryIndex],
onDiscard: ClientContext => Unit,
): RaftMemberData = {
val conflictIndices = possiblyConflictIndex match {
case None => Seq.empty
case Some(possiblyConflictIndex) =>
if (possiblyConflictIndex >= replicatedLog.lastLogIndex.plus(1)) {
// No conflict log entries
Seq.empty
} else {
replicatedLog
.sliceEntries(
from = possiblyConflictIndex,
to = replicatedLog.lastLogIndex,
)
.map(_.index)
}
}
val conflictClientIndices = conflictIndices.filter(clients.contains)
if (conflictClientIndices.isEmpty) {
// No conflict clients
this
} else {
if (log.isInfoEnabled) {
log.info("Found [{}] conflict clients, discarding these clients", conflictClientIndices.size)
}
conflictClientIndices.foreach { conflictIndex =>
onDiscard(clients(conflictIndex))
}
updateLeaderVolatileState(clients = clients.removedAll(conflictClientIndices))
}
}

protected def updateLeaderVolatileState(
nextIndex: Option[NextIndex] = nextIndex,
matchIndex: MatchIndex = matchIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,14 @@ private[entityreplication] final case class ClientContext(
ref: ActorRef,
instanceId: Option[EntityInstanceId],
originSender: Option[ActorRef],
)
) {

/** Sends the given `message` to the actor `ref`, including the sender `originSender`
*
* If `originSender` is `None`, [[ActorRef.noSender]] is included as the sender.
*/
def forward(message: Any): Unit = {
ref.tell(message, originSender.getOrElse(ActorRef.noSender))
}

}
10 changes: 10 additions & 0 deletions src/test/scala/lerna/akka/entityreplication/raft/ActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,20 @@ trait ActorSpec extends WordSpecLike with Matchers with BeforeAndAfterEach with
override def beforeEach(): Unit = {
super.beforeEach()
(autoKillManager ? Identify("to wait for start-up")).await

// Ignoring all messages sent in the previous unit test case
ignoreAllMessagesSentBefore()
}

override def afterEach(): Unit = {
(autoKillManager ? TestActorAutoKillManager.KillAll).await
super.afterEach()
}

private def ignoreAllMessagesSentBefore(): Unit = {
case object SentinelMessage
testActor.tell(SentinelMessage, ActorRef.noSender)
fishForMessage(hint = "ignoring all messages sent before")(_ == SentinelMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import akka.actor.{ typed, ActorSystem }
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.testkit.{ TestKit, TestProbe }
import com.typesafe.config.ConfigFactory
import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.RaftActor.{ AppendedEntries_V2_1_0, Follower }
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands.InstallSnapshot
import lerna.akka.entityreplication.raft.protocol.RaftCommands.{ AppendEntriesSucceeded, InstallSnapshot }
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._
import org.scalatest.Inside

import scala.annotation.nowarn

Expand All @@ -23,7 +24,8 @@ object RaftActorSpec {

class RaftActorSpec
extends TestKit(ActorSystem("RaftActorSpec", RaftActorSpecBase.configWithPersistenceTestKits))
with RaftActorSpecBase {
with RaftActorSpecBase
with Inside {
import RaftActorSpec._

private implicit val typedSystem: typed.ActorSystem[Nothing] = system.toTyped
Expand Down Expand Up @@ -617,6 +619,62 @@ class RaftActorSpec
recoveredLogEntries.map(_.event) should contain theSameElementsInOrderAs expectedEntriesAfterRecover.map(_.event)
}

"send ReplicationFailed messages to conflict clients and discard the clients " +
"if the received AppendEntries message contains conflict entries" in {
val shardId = createUniqueShardId()
val followerMemberIndex = createUniqueMemberIndex()
val follower = createRaftActor(
shardId = shardId,
selfMemberIndex = followerMemberIndex,
)
val clientProbe1 = TestProbe()
val clientProbe2 = TestProbe()
val client1 = ClientContext(clientProbe1.ref, Some(EntityInstanceId(1)), Some(TestProbe().ref))
val client2 = ClientContext(clientProbe2.ref, Some(EntityInstanceId(2)), None)
val followerData = {
val replicatedLog = ReplicatedLog().truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
LogEntry(LogEntryIndex(4), EntityEvent(None, NoOp), Term(2)),
),
)
RaftMemberData(currentTerm = Term(2), replicatedLog = replicatedLog, commitIndex = LogEntryIndex(1))
.registerClient(client1, LogEntryIndex(2))
.registerClient(client2, LogEntryIndex(3))
}
setState(follower, Follower, followerData)

val leaderMemberIndex = createUniqueMemberIndex()
val appendEntries = createAppendEntries(
shardId,
Term(3),
leaderMemberIndex,
prevLogIndex = LogEntryIndex(1),
prevLogTerm = Term(1),
entries = Seq(
LogEntry(LogEntryIndex(2), EntityEvent(None, NoOp), Term(3)),
),
)
val regionProbe = TestProbe()
follower.tell(appendEntries, regionProbe.ref)
regionProbe.expectMsg(AppendEntriesSucceeded(Term(3), LogEntryIndex(2), followerMemberIndex))

// Verifies that the RaftActor discards conflict clients.
inside(getState(follower).stateData) { newFollowerData =>
assert(!newFollowerData.clients.contains(LogEntryIndex(2)))
assert(!newFollowerData.clients.contains(LogEntryIndex(3)))
}

// Verifies that each conflict client receives a ReplicationFailed message
clientProbe1.expectMsg(ReplicationFailed)
assert(clientProbe1.sender() == client1.originSender.get) // including the original sender
clientProbe2.expectMsg(ReplicationFailed)
assert(clientProbe2.sender() == system.deadLetters) // including no original sender

}

}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.model.NormalizedEntityId
import akka.actor.ActorRef
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.model._
import org.scalatest.{ FlatSpec, Inside, Matchers }

Expand Down Expand Up @@ -773,6 +774,97 @@ final class RaftMemberDataSpec extends FlatSpec with Matchers with Inside {
)
}

behavior of "RaftMemberData.discardConflictClients"

it should "discard no existing clients and call no onDiscard handler if the given index is None" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(None, conflictClient => discardedClients += conflictClient)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard no existing clients and call no onDiscard handler if the given index is greater than the last log index plus one" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(5)),
conflictClient => discardedClients += conflictClient,
)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard no existing clients and call no onDiscard handler if the given index is equal to the last log index plus one" in {
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None), LogEntryIndex(2))
.registerClient(ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None), LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(4)),
conflictClient => discardedClients += conflictClient,
)

newData.clients should be(data.clients)
discardedClients should be(empty)
}

it should "discard conflict clients and call onDiscard handler with each conflict clients if conflict clients exist" in {
val client1 = ClientContext(ActorRef.noSender, Some(EntityInstanceId(1)), None)
val client2 = ClientContext(ActorRef.noSender, Some(EntityInstanceId(2)), None)
val data = RaftMemberData()
.truncateAndAppendEntries(
Seq(
LogEntry(LogEntryIndex(1), EntityEvent(None, NoOp), Term(1)),
LogEntry(LogEntryIndex(2), EntityEvent(Some(NormalizedEntityId("entity-1")), "event-a"), Term(1)),
LogEntry(LogEntryIndex(3), EntityEvent(Some(NormalizedEntityId("entity-2")), "event-b"), Term(1)),
),
)
.registerClient(client1, LogEntryIndex(2))
.registerClient(client2, LogEntryIndex(3))

var discardedClients = Set.empty[ClientContext]
val newData = data.discardConflictClients(
Some(LogEntryIndex(2)),
conflictClient => discardedClients += conflictClient,
)

assert(!newData.clients.contains(LogEntryIndex(2)))
assert(!newData.clients.contains(LogEntryIndex(3)))
assert(discardedClients.contains(client1))
assert(discardedClients.contains(client2))
}

private def generateEntityId() = {
NormalizedEntityId.from(UUID.randomUUID().toString)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package lerna.akka.entityreplication.raft.model

import akka.actor.ActorSystem
import akka.testkit.{ TestKit, TestProbe }
import lerna.akka.entityreplication.raft.ActorSpec

final class ClientContextSpec extends TestKit(ActorSystem("ClientContextSpec")) with ActorSpec {

"ClientContext.forward should send the given message to the actor, including no sender, if the context doesn't have an original sender" in {
val probe = TestProbe()
val clientContext = ClientContext(probe.ref, instanceId = None, originSender = None)
clientContext.forward("message-1")
probe.expectMsg("message-1")
probe.sender() should be(system.deadLetters)
}

"ClientContext.forward should send the given message to the actor, including an original sender, if the context has the original sender" in {
val probe = TestProbe()
val originalSender = TestProbe().ref
val clientContext = ClientContext(probe.ref, instanceId = None, originSender = Some(originalSender))
clientContext.forward("message-1")
probe.expectMsg("message-1")
probe.sender() should be(originalSender)
}

}