Skip to content

Commit

Permalink
Merge pull request #171 from lerna-stack/start-replication-if-non-app…
Browse files Browse the repository at this point in the history
…lied-entries-have-only-noops

Leader starts replication if non-applied entries contain only NoOps
  • Loading branch information
negokaz authored Aug 23, 2022
2 parents 0bd7465 + 70ddf99 commit 457f529
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR#158](https://github.com/lerna-stack/akka-entity-replication/pull/158)
- Leader cannot reply to an entity with a `ReplicationFailed` message in some cases
[#153](https://github.com/lerna-stack/akka-entity-replication/issues/153),
[PR#161](https://github.com/lerna-stack/akka-entity-replication/pull/161)
[PR#161](https://github.com/lerna-stack/akka-entity-replication/pull/161),
[#170](https://github.com/lerna-stack/akka-entity-replication/issues/170),
[PR#171](https://github.com/lerna-stack/akka-entity-replication/pull/171)
- An entity could stick at WaitForReplication when a Raft log entry is truncated by conflict
[#155](https://github.com/lerna-stack/akka-entity-replication/issues/155),
[#PR162](https://github.com/lerna-stack/akka-entity-replication/pull/162)
Expand Down
13 changes: 9 additions & 4 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ private[raft] trait Leader { this: RaftActor =>

private[this] def replicate(replicate: Replicate): Unit = {
def startReplication(): Unit = {
if (log.isDebugEnabled) {
log.debug("=== [Leader] starting replication for [{}]", replicate)
}
cancelHeartbeatTimeoutTimer()
applyDomainEvent(AppendedEvent(EntityEvent(replicate.entityId, replicate.event))) { _ =>
applyDomainEvent(
Expand All @@ -210,10 +213,12 @@ private[raft] trait Leader { this: RaftActor =>
replicate match {
case replicate: Replicate.ReplicateForEntity =>
val nonAppliedEntityEntries =
currentData.replicatedLog.sliceEntityEntries(
replicate._entityId,
from = replicate._entityLastAppliedIndex.next(),
)
currentData.replicatedLog
.sliceEntityEntries(
replicate._entityId,
from = replicate._entityLastAppliedIndex.next(),
)
.filterNot(_.event.event == NoOp)
if (nonAppliedEntityEntries.nonEmpty) {
if (log.isWarningEnabled) {
val eventClassName = replicate.event.getClass.getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1454,6 +1454,50 @@ class RaftActorLeaderSpec
}
}

"start replication if it receives a Replicate message sent from an entity whose non-applied entries contain only NoOp" in {
val entityId = NormalizedEntityId("entity-1")
val leaderMemberIndex = createUniqueMemberIndex()
val followerMemberIndex = createUniqueMemberIndex()
val leader = createRaftActor(
selfMemberIndex = leaderMemberIndex,
otherMemberIndexes = Set(followerMemberIndex),
)
val leaderData = {
val replicatedLog =
ReplicatedLog()
.reset(Term(2), LogEntryIndex(4))
.truncateAndAppend(
Seq(
LogEntry(LogEntryIndex(5), EntityEvent(None, NoOp), Term(3)),
LogEntry(LogEntryIndex(6), EntityEvent(Some(entityId), "event-a"), Term(3)),
LogEntry(LogEntryIndex(7), EntityEvent(Some(entityId), NoOp), Term(3)),
),
)
createLeaderData(
currentTerm = Term(3),
log = replicatedLog,
commitIndex = LogEntryIndex(7),
lastApplied = LogEntryIndex(7),
)
}
setState(leader, Leader, leaderData)

val replicate =
Replicate(
event = "event-b",
replyTo = TestProbe().ref,
entityId = entityId,
instanceId = EntityInstanceId(987),
entityLastAppliedIndex = LogEntryIndex(6),
originSender = system.deadLetters,
)
// Verify log to check replication starts.
LoggingTestKit
.debug(s"=== [Leader] starting replication for [$replicate]").expect {
leader ! replicate
}
}

"log a warning if it receives a ReplicationSucceeded message containing an event other than NoOp" in {
val leader = createRaftActor()
val leaderData = createLeaderData(Term(1))
Expand Down

0 comments on commit 457f529

Please sign in to comment.