Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow only specific actors defined as a sticky leader to become a leader #186

Merged
merged 29 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a15e43a
feat: add setting of sticky leader
nyamada-tis Nov 7, 2022
5e9491a
test: add setting of sticky leader
nyamada-tis Nov 7, 2022
4e118a9
feat: allow only raft actors defined as sticky leader become leader
nyamada-tis Nov 7, 2022
45d7355
test: allow only raft actors defined as sticky leader become leader
nyamada-tis Nov 7, 2022
55d8ea8
refactor: add private[this] modifier to receiveElectionTimeout function
nyamada-tis Nov 7, 2022
8287609
refactor: retrieve method of receiveElectionTimeout in Candidate.scala
nyamada-tis Nov 7, 2022
720ebcf
feat: allow only raft actors defined as sticky leader become leader
nyamada-tis Nov 7, 2022
1e467e9
test: allow only raft actors defined as sticky leader become leader
nyamada-tis Nov 7, 2022
f20d7c1
feat: notice warn log if sticky-leaders is configured
nyamada-tis Nov 7, 2022
e641326
test: notice warn log if sticky-leaders is configured
nyamada-tis Nov 7, 2022
2230615
docs: update CHANGELOG.md
nyamada-tis Nov 7, 2022
6adaba7
Merge branch 'master' into allow-only-specific-actors-to-become-leader
ydash Nov 8, 2022
84ff33b
refactor: set sticky leaders by API instead of configuration file
nyamada-tis Nov 16, 2022
658e561
chore: formatting
nyamada-tis Nov 16, 2022
5d7da8b
test: optimize imports
nyamada-tis Nov 16, 2022
687752c
Merge branch 'master' into allow-only-specific-actors-to-become-leader
ydash Nov 16, 2022
870c9c5
chore: add MiMa filter for ClusterReplicationSettings.withStickyLeaders
nyamada-tis Nov 16, 2022
e3b3ccc
fix: fix sticky leader function: RaftActor can not become Candidate i…
nyamada-tis Nov 17, 2022
b6e0030
refactor: change log message and trigger of logging
nyamada-tis Nov 17, 2022
d99c78b
docs: add explanation of fixing leader setting
nyamada-tis Nov 17, 2022
98e4f45
chore: fix line spacing
nyamada-tis Nov 17, 2022
b07b22f
Merge branch 'master' into allow-only-specific-actors-to-become-leader
ydash Nov 17, 2022
ba7d2cd
docs: modify explanation of sticky leader
nyamada-tis Nov 18, 2022
9c72550
refactor: make the log messag one line
nyamada-tis Nov 18, 2022
3e547ae
test: add a test case that a raft actor whose shard id doesn't have s…
nyamada-tis Nov 18, 2022
4704e1d
docs: update explanation of sticky leader
nyamada-tis Nov 18, 2022
815b932
docs: correct a grammatical error
nyamada-tis Nov 18, 2022
d0be100
refactor: make function isStickyLeader for ease of understanding
nyamada-tis Nov 18, 2022
2aec64a
refactor: remove unnecessary arguments from canBecomeCandidate
nyamada-tis Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR178](https://github.com/lerna-stack/akka-entity-replication/pull/178)
- Add diagnostic info to logs of sending replication results
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)
- Allow only specific actors defined as a sticky leader to become a leader
[PR#186](https://github.com/lerna-stack/akka-entity-replication/pull/186)

### Fixed
- RaftActor might delete committed entries
Expand Down
25 changes: 25 additions & 0 deletions docs/implementation_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,31 @@ AtLeastOnceComplete.askTo(
)
```

### Fixing leader actor per shard

By default, all raft actors can become a Leader.
You can fix a Leader actor per shard by setting a sticky leader.
Fixing leader actor prevents committed event loss during recovery.

```scala
import akka.actor.typed.ActorSystem
import lerna.akka.entityreplication.typed._

val system: ActorSystem[_] = ???
val clusterReplication = ClusterReplication(system)

// Settings for sticky leader. Only a raft actor which have role "replica-group-1" can become Leader in shard given id "1"
val settings =
ClusterReplicationSettings(system)
.withStickyLeaders(Map("1" -> "replica-group-1"))

val entity =
ReplicatedEntity(BankAccountBehavior.TypeKey)(entityContext => BankAccountBehavior(entityContext))
.withSettings(settings)

clusterReplication.init(entity)
```

### Configuration

On the command side, the related settings are defined at `lerna.akka.entityreplication`(except `lerna.akka.entityreplication.raft.eventsourced`) in [reference.conf](/src/main/resources/reference.conf).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# ClusterReplicationSettings should not be extended by users
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.ClusterReplicationSettings.withStickyLeaders")
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ trait ClusterReplicationSettings {

def withDisabledShards(disabledShards: Set[String]): ClusterReplicationSettings

def withStickyLeaders(stickyLeaders: Map[String, String]): ClusterReplicationSettings
negokaz marked this conversation as resolved.
Show resolved Hide resolved
negokaz marked this conversation as resolved.
Show resolved Hide resolved

def withRaftJournalPluginId(pluginId: String): ClusterReplicationSettings

def withRaftSnapshotPluginId(pluginId: String): ClusterReplicationSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ private[entityreplication] final case class ClusterReplicationSettingsImpl(
override def withDisabledShards(disabledShards: Set[String]): ClusterReplicationSettings =
copy(raftSettings = raftSettings.withDisabledShards(disabledShards))

override def withStickyLeaders(stickyLeaders: Map[String, String]): ClusterReplicationSettingsImpl =
copy(raftSettings = raftSettings.withStickyLeaders(stickyLeaders))

override def withRaftJournalPluginId(pluginId: String): ClusterReplicationSettingsImpl =
copy(raftSettings = raftSettings.withJournalPluginId(pluginId))

Expand Down
41 changes: 22 additions & 19 deletions src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,7 @@ private[raft] trait Candidate { this: RaftActor =>
import RaftActor._

def candidateBehavior: Receive = {

case ElectionTimeout =>
if (log.isInfoEnabled)
log.info("[Candidate] Election timeout at {}. Retrying leader election.", currentData.currentTerm)
val newTerm = currentData.currentTerm.next()
cancelElectionTimeoutTimer()
broadcast(
RequestVote(
shardId,
newTerm,
selfMemberIndex,
currentData.replicatedLog.lastLogIndex,
currentData.replicatedLog.lastLogTerm,
),
) // TODO: 永続化前に broadcast して問題ないか調べる
applyDomainEvent(BegunNewTerm(newTerm)) { _ =>
become(Candidate)
}

case ElectionTimeout => receiveElectionTimeout()
case request: RequestVote => receiveRequestVote(request)
case response: RequestVoteResponse => receiveRequestVoteResponse(response)
case request: AppendEntries => receiveAppendEntries(request)
Expand All @@ -55,6 +37,27 @@ private[raft] trait Candidate { this: RaftActor =>

}

private def receiveElectionTimeout(): Unit = {
if (log.isInfoEnabled)
log.info("[Candidate] Election timeout at {}. Retrying leader election.", currentData.currentTerm)
val newTerm = currentData.currentTerm.next()
cancelElectionTimeoutTimer()
if (canBecomeCandidate) {
broadcast(
RequestVote(
shardId,
newTerm,
selfMemberIndex,
currentData.replicatedLog.lastLogIndex,
currentData.replicatedLog.lastLogTerm,
),
) // TODO: 永続化前に broadcast して問題ないか調べる
applyDomainEvent(BegunNewTerm(newTerm)) { _ =>
become(Candidate)
}
}
}

private[this] def receiveRequestVote(request: RequestVote): Unit =
request match {

Expand Down
23 changes: 13 additions & 10 deletions src/main/scala/lerna/akka/entityreplication/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,7 @@ private[raft] trait Follower { this: RaftActor =>
import RaftActor._

def followerBehavior: Receive = {

case ElectionTimeout =>
if (currentData.leaderMember.isEmpty) {
if (log.isDebugEnabled) log.debug("=== [Follower] election timeout ===")
} else {
if (log.isWarningEnabled) log.warning("[{}] election timeout. Leader will be changed", currentState)
}
requestVote(currentData)

case ElectionTimeout => receiveElectionTimeout()
case request: RequestVote => receiveRequestVote(request)
case request: AppendEntries => receiveAppendEntries(request)
case request: InstallSnapshot => receiveInstallSnapshot(request)
Expand All @@ -45,6 +37,18 @@ private[raft] trait Follower { this: RaftActor =>

}

private[this] def receiveElectionTimeout(): Unit = {
if (currentData.leaderMember.isEmpty) {
if (log.isDebugEnabled) log.debug("=== [Follower] election timeout ===")
} else {
if (log.isWarningEnabled) log.warning("[{}] election timeout. Leader will be changed", currentState)
}
cancelElectionTimeoutTimer()
if (canBecomeCandidate) {
requestVote(currentData)
}
}

private[this] def receiveRequestVote(request: RequestVote): Unit =
request match {

Expand Down Expand Up @@ -150,7 +154,6 @@ private[raft] trait Follower { this: RaftActor =>

private[this] def requestVote(data: RaftMemberData): Unit = {
val newTerm = data.currentTerm.next()
cancelElectionTimeoutTimer()
broadcast(
RequestVote(shardId, newTerm, selfMemberIndex, data.replicatedLog.lastLogIndex, data.replicatedLog.lastLogTerm),
) // TODO: 永続化前に broadcast して問題ないか調べる
Expand Down
15 changes: 15 additions & 0 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,21 @@ private[raft] class RaftActor(
import RaftActor._
import context.dispatcher

if (isStickyLeader && log.isWarningEnabled) {
log.warning(
"This raft actor is defined as sticky leader. shard id = {} , role = {}, stickyLeaders = {}. sticky leaders are not able to failover to other multi-raft-roles. This means that some entities can be unavailable by some failures. Please unset sticky-leaders settings if there is no need.",
shardId.raw,
selfMemberIndex.role,
settings.stickyLeaders,
)
}

private def isStickyLeader: Boolean =
settings.stickyLeaders.get(this.shardId.raw).fold(false)(_ == this.selfMemberIndex.role)

protected def canBecomeCandidate: Boolean =
settings.stickyLeaders.get(this.shardId.raw).fold(true)(_ == this.selfMemberIndex.role)

protected[this] def shardId: NormalizedShardId = NormalizedShardId.from(self.path)

protected[this] def region: ActorRef = _region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ trait RaftSettings {

def electionTimeout: FiniteDuration

def stickyLeaders: Map[String, String]

private[raft] def randomizedElectionTimeout(): FiniteDuration

def heartbeatInterval: FiniteDuration
Expand Down Expand Up @@ -85,6 +87,8 @@ trait RaftSettings {

private[entityreplication] def withDisabledShards(disabledShards: Set[String]): RaftSettings

private[entityreplication] def withStickyLeaders(stickyLeaders: Map[String, String]): RaftSettings

private[entityreplication] def withJournalPluginId(pluginId: String): RaftSettings

private[entityreplication] def withSnapshotPluginId(pluginId: String): RaftSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import scala.util.Random
private[entityreplication] final case class RaftSettingsImpl(
config: Config,
electionTimeout: FiniteDuration,
stickyLeaders: Map[String, String],
heartbeatInterval: FiniteDuration,
electionTimeoutMin: Duration,
multiRaftRoles: Set[String],
Expand Down Expand Up @@ -58,6 +59,9 @@ private[entityreplication] final case class RaftSettingsImpl(
override private[entityreplication] def withDisabledShards(disabledShards: Set[String]): RaftSettings =
copy(disabledShards = disabledShards)

override private[entityreplication] def withStickyLeaders(stickyLeaders: Map[String, String]) =
copy(stickyLeaders = stickyLeaders)

override private[entityreplication] def withJournalPluginId(pluginId: String): RaftSettings =
copy(journalPluginId = pluginId)

Expand All @@ -83,6 +87,8 @@ private[entityreplication] object RaftSettingsImpl {

val electionTimeout: FiniteDuration = config.getDuration("election-timeout").toScala

val stickyLeaders: Map[String, String] = Map.empty

val heartbeatInterval: FiniteDuration = config.getDuration("heartbeat-interval").toScala

val electionTimeoutMin: Duration = electionTimeout * randomizedMinFactor
Expand Down Expand Up @@ -224,6 +230,7 @@ private[entityreplication] object RaftSettingsImpl {
RaftSettingsImpl(
config = config,
electionTimeout = electionTimeout,
stickyLeaders = stickyLeaders,
heartbeatInterval = heartbeatInterval,
electionTimeoutMin = electionTimeoutMin,
multiRaftRoles = multiRaftRoles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ClusterReplicationSettingsSpec extends WordSpec with Matchers {
newSettings.raftSettings.disabledShards should be(Set("1", "3"))
}

"change value of raftSettings.stickyLeaders by withStickyLeaders" in {
val settings = ClusterReplicationSettingsImpl(config, correctClusterRoles.headOption.toSet)
val newSettings = settings.withStickyLeaders(Map("1" -> "replica-group-1"))
newSettings.raftSettings.stickyLeaders should be(Map("1" -> "replica-group-1"))
}

"change value of raftSettings.journalPluginId by withRaftJournalPluginId" in {
val settings = ClusterReplicationSettingsImpl(config, correctClusterRoles.headOption.toSet)
val expectedPluginId = "new-raft-journal-plugin-id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.testkit.{ TestKit, TestProbe }
import lerna.akka.entityreplication.ReplicationRegion
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId, NormalizedShardId }
import lerna.akka.entityreplication.raft.RaftProtocol.{
Command,
ForwardedCommand,
ProcessCommand,
Replicate,
ReplicationFailed,
}
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.routing.MemberIndex
Expand Down Expand Up @@ -212,6 +206,105 @@ class RaftActorCandidateSpec
}
}

"send RequestVote on ElectionTimeout if a shard which it's belongs to doesn't have a sticky leader" in {
val shardId = createUniqueShardId()
val candidateMemberIndex = createUniqueMemberIndex()
val regionProbe = TestProbe()
val customSettings = RaftSettings(defaultRaftConfig).withStickyLeaders(Map.empty)
val candidate = createRaftActor(
shardId = shardId,
selfMemberIndex = candidateMemberIndex,
region = regionProbe.ref,
settings = customSettings,
)
val currentTerm = Term(2)
val candidateData = createCandidateData(currentTerm, ReplicatedLog())
setState(candidate, Candidate, candidateData)

// ElectionTimeout triggers that this candidate sends a RequestVote.
candidate ! ElectionTimeout
val expectedRequestVote =
RequestVote(
shardId,
term = Term(3),
candidate = candidateMemberIndex,
lastLogIndex = LogEntryIndex(0),
lastLogTerm = Term(0),
)
regionProbe.expectMsg(ReplicationRegion.Broadcast(expectedRequestVote))
awaitAssert {
assert(getState(candidate).stateName == Candidate)
val stateData = getState(candidate).stateData
assert(stateData.currentTerm == Term(3))
assert(stateData.votedFor.isEmpty)
assert(stateData.acceptedMembers.isEmpty)
}
}

"send RequestVote on ElectionTimeout if it is defined as sticky leader" in {
val shardId = createUniqueShardId()
val candidateMemberIndex = createUniqueMemberIndex()
val regionProbe = TestProbe()
val customSettings =
RaftSettings(defaultRaftConfig).withStickyLeaders(Map(shardId.raw -> candidateMemberIndex.role))
val candidate = createRaftActor(
shardId = shardId,
selfMemberIndex = candidateMemberIndex,
region = regionProbe.ref,
settings = customSettings,
)
val currentTerm = Term(2)
val candidateData = createCandidateData(currentTerm, ReplicatedLog())
setState(candidate, Candidate, candidateData)

// ElectionTimeout triggers that this candidate sends a RequestVote.
candidate ! ElectionTimeout
val expectedRequestVote =
RequestVote(
shardId,
term = Term(3),
candidate = candidateMemberIndex,
lastLogIndex = LogEntryIndex(0),
lastLogTerm = Term(0),
)
regionProbe.expectMsg(ReplicationRegion.Broadcast(expectedRequestVote))
awaitAssert {
assert(getState(candidate).stateName == Candidate)
val stateData = getState(candidate).stateData
assert(stateData.currentTerm == Term(3))
assert(stateData.votedFor.isEmpty)
assert(stateData.acceptedMembers.isEmpty)
}
}

"not send RequestVote on ElectionTimeout if another raft actor which belongs same shard is defined as a sticky leader" in {
val shardId = createUniqueShardId()
val candidateMemberIndex = createUniqueMemberIndex()
val regionProbe = TestProbe()
val customSettings =
RaftSettings(defaultRaftConfig).withStickyLeaders(Map(s"${shardId.raw}" -> "other-actors-role"))
val candidate = createRaftActor(
shardId = shardId,
selfMemberIndex = candidateMemberIndex,
region = regionProbe.ref,
settings = customSettings,
)
val currentTerm = Term(2)
val candidateData = createCandidateData(currentTerm, ReplicatedLog())
setState(candidate, Candidate, candidateData)

// ElectionTimeout should not triggers that this candidate sends a RequestVote.
candidate ! ElectionTimeout
regionProbe.expectNoMessage()
awaitAssert {
assert(getState(candidate).stateName == Candidate)
val stateData = getState(candidate).stateData
assert(stateData.currentTerm == Term(2))
assert(stateData.votedFor.isEmpty)
assert(stateData.acceptedMembers.isEmpty)
}
}

"メンバーの過半数に Accept されると Leader になる" in {
val selfMemberIndex = createUniqueMemberIndex()
val follower1MemberIndex = createUniqueMemberIndex()
Expand Down
Loading