Skip to content

Commit

Permalink
Merge pull request #200 from lerna-stack/non-leaders-passivate-entiti…
Browse files Browse the repository at this point in the history
…es-eventually

Non-leaders passivate entities eventually
  • Loading branch information
negokaz authored Apr 12, 2023
2 parents 05bacf2 + a59fc59 commit 9a21949
Show file tree
Hide file tree
Showing 13 changed files with 743 additions and 37 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
[Unreleased]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.2.0...master

### Fixed
- Dead letters about ReplicationRegion$Passivate continue
[#199](https://github.com/lerna-stack/akka-entity-replication/issues/199),
[PR#200](https://github.com/lerna-stack/akka-entity-replication/pull/200)


## [v2.2.0] - 2022-012-26
[v2.2.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.1.0...v2.2.0
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ message TryCreateEntity {
required NormalizedEntityId entity_id = 2;
}

message EntityPassivationPermitRequest {
required NormalizedShardId shard_id = 1;
required NormalizedEntityId entity_id = 2;
required Payload stop_message = 3;
}

message EntityPassivationPermitted {
required NormalizedEntityId entity_id = 1;
required Payload stop_message = 2;
}

message EntityPassivationDenied {
required NormalizedEntityId entity_id = 1;
}


// ===
// raft.snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
val CommitLogStoreAppendCommittedEntriesManifest = "BD"
val CommitLogStoreAppendCommittedEntriesResponseManifest = "BE"
// raft.protocol
val RequestVoteManifest = "CA"
val RequestVoteAcceptedManifest = "CB"
val RequestVoteDeniedManifest = "CC"
val AppendEntriesManifest = "CD"
val AppendEntriesSucceededManifest = "CE"
val AppendEntriesFailedManifest = "CF"
val InstallSnapshotManifest = "CG"
val InstallSnapshotSucceededManifest = "CH"
val SuspendEntityManifest = "CI"
val TryCreateEntityManifest = "CJ"
val RequestVoteManifest = "CA"
val RequestVoteAcceptedManifest = "CB"
val RequestVoteDeniedManifest = "CC"
val AppendEntriesManifest = "CD"
val AppendEntriesSucceededManifest = "CE"
val AppendEntriesFailedManifest = "CF"
val InstallSnapshotManifest = "CG"
val InstallSnapshotSucceededManifest = "CH"
val SuspendEntityManifest = "CI"
val TryCreateEntityManifest = "CJ"
val EntityPassivationPermitRequestManifest = "CK"
val EntityPassivationPermittedManifest = "CL"
val EntityPassivationDeniedManifest = "CM"
// raft.snapshot
val EntitySnapshotManifest = "DA"
// raft.snapshot.sync
Expand Down Expand Up @@ -85,16 +88,19 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
CommitLogStoreAppendCommittedEntriesManifest -> commitLogStoreAppendCommittedEntriesFromBinary,
CommitLogStoreAppendCommittedEntriesResponseManifest -> commitLogStoreAppendCommittedEntriesResponseFromBinary,
// raft.protocol
RequestVoteManifest -> requestVoteFromBinary,
RequestVoteAcceptedManifest -> requestVoteAcceptedFromBinary,
RequestVoteDeniedManifest -> requestVoteDeniedFromBinary,
AppendEntriesManifest -> appendEntriesFromBinary,
AppendEntriesSucceededManifest -> appendEntriesSucceededFromBinary,
AppendEntriesFailedManifest -> appendEntriesFailedFromBinary,
InstallSnapshotManifest -> installSnapshotFromBinary,
InstallSnapshotSucceededManifest -> installSnapshotSucceededFromBinary,
SuspendEntityManifest -> suspendEntityFromBinary,
TryCreateEntityManifest -> tryCreateEntityFromBinary,
RequestVoteManifest -> requestVoteFromBinary,
RequestVoteAcceptedManifest -> requestVoteAcceptedFromBinary,
RequestVoteDeniedManifest -> requestVoteDeniedFromBinary,
AppendEntriesManifest -> appendEntriesFromBinary,
AppendEntriesSucceededManifest -> appendEntriesSucceededFromBinary,
AppendEntriesFailedManifest -> appendEntriesFailedFromBinary,
InstallSnapshotManifest -> installSnapshotFromBinary,
InstallSnapshotSucceededManifest -> installSnapshotSucceededFromBinary,
SuspendEntityManifest -> suspendEntityFromBinary,
TryCreateEntityManifest -> tryCreateEntityFromBinary,
EntityPassivationPermitRequestManifest -> entityPassivationPermitRequestFromBinary,
EntityPassivationPermittedManifest -> entityPassivationPermittedFromBinary,
EntityPassivationDeniedManifest -> entityPassivationDeniedFromBinary,
// raft.snapshot
EntitySnapshotManifest -> entitySnapshotFromBinary,
// raft.snapshot.sync
Expand Down Expand Up @@ -171,6 +177,9 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: raft.protocol.RaftCommands.InstallSnapshotSucceeded => InstallSnapshotSucceededManifest
case _: raft.protocol.SuspendEntity => SuspendEntityManifest
case _: raft.protocol.TryCreateEntity => TryCreateEntityManifest
case _: raft.protocol.EntityPassivationPermitRequest => EntityPassivationPermitRequestManifest
case _: raft.protocol.EntityPassivationPermitted => EntityPassivationPermittedManifest
case _: raft.protocol.EntityPassivationDenied => EntityPassivationDeniedManifest
// raft.snapsnot
case _: raft.snapshot.SnapshotProtocol.EntitySnapshot => EntitySnapshotManifest
// raft.snapshot.sync
Expand Down Expand Up @@ -219,6 +228,9 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: raft.protocol.RaftCommands.InstallSnapshotSucceeded => installSnapshotSucceededToBinary(m)
case m: raft.protocol.SuspendEntity => suspendEntityToBinary(m)
case m: raft.protocol.TryCreateEntity => tryCreateEntityToBinary(m)
case m: raft.protocol.EntityPassivationPermitRequest => entityPassivationPermitRequestToBinary(m)
case m: raft.protocol.EntityPassivationPermitted => entityPassivationPermittedToBinary(m)
case m: raft.protocol.EntityPassivationDenied => entityPassivationDeniedToBinary(m)
// raft.snapshot
case m: raft.snapshot.SnapshotProtocol.EntitySnapshot => entitySnapShotToBinary(m)
// raft.snapshot.sync
Expand Down Expand Up @@ -728,6 +740,66 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
)
}

private def entityPassivationPermitRequestToBinary(
message: raft.protocol.EntityPassivationPermitRequest,
): Array[Byte] = {
msg.EntityPassivationPermitRequest
.of(
shardId = normalizedShardIdToProto(message.shardId),
entityId = normalizedEntityIdToProto(message.entityId),
stopMessage = payloadToProto(message.stopMessage),
).toByteArray
}

private def entityPassivationPermitRequestFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationPermitRequest = {
val proto = msg.EntityPassivationPermitRequest.parseFrom(bytes)
raft.protocol.EntityPassivationPermitRequest(
shardId = normalizedShardIdFromProto(proto.shardId),
entityId = normalizedEntityIdFromProto(proto.entityId),
stopMessage = payloadFromProto(proto.stopMessage),
)
}

private def entityPassivationPermittedToBinary(
message: raft.protocol.EntityPassivationPermitted,
): Array[Byte] = {
msg.EntityPassivationPermitted
.of(
entityId = normalizedEntityIdToProto(message.entityId),
stopMessage = payloadToProto(message.stopMessage),
).toByteArray
}

private def entityPassivationPermittedFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationPermitted = {
val proto = msg.EntityPassivationPermitted.parseFrom(bytes)
raft.protocol.EntityPassivationPermitted(
entityId = normalizedEntityIdFromProto(proto.entityId),
stopMessage = payloadFromProto(proto.stopMessage),
)
}

private def entityPassivationDeniedToBinary(
message: raft.protocol.EntityPassivationDenied,
): Array[Byte] = {
msg.EntityPassivationDenied
.of(
entityId = normalizedEntityIdToProto(message.entityId),
).toByteArray
}

private def entityPassivationDeniedFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationDenied = {
val proto = msg.EntityPassivationDenied.parseFrom(bytes)
raft.protocol.EntityPassivationDenied(
entityId = normalizedEntityIdFromProto(proto.entityId),
)
}

// ===
// raft.snapshot
// ===
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.ReplicationRegion
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager

Expand All @@ -21,6 +22,9 @@ private[raft] trait Candidate { this: RaftActor =>
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case replicate: Replicate => receiveReplicate(replicate)
case passivate: ReplicationRegion.Passivate => handlePassivateForNonLeader(passivate)
case _: EntityPassivationPermitRequest => // ignore, because I'm not a leader
case response: EntityPassivationPermitResponse => handleEntityPassivationPermitResponse(response)
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.ReplicationRegion
Expand All @@ -21,6 +21,9 @@ private[raft] trait Follower { this: RaftActor =>
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case replicate: Replicate => receiveReplicate(replicate)
case passivate: ReplicationRegion.Passivate => handlePassivateForNonLeader(passivate)
case _: EntityPassivationPermitRequest => // ignore, because I'm not a leader
case response: EntityPassivationPermitResponse => handleEntityPassivationPermitResponse(response)
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.ReplicationRegion
Expand All @@ -31,6 +31,8 @@ private[raft] trait Leader { this: RaftActor =>
case request: Replicate => replicate(request)
case response: ReplicationResponse => receiveReplicationResponse(response)
case ReplicationRegion.Passivate(entityPath, stopMessage) => startEntityPassivationProcess(entityPath, stopMessage)
case request: EntityPassivationPermitRequest => handleEntityPassivationPermitRequest(request)
case _: EntityPassivationPermitResponse => // ignore, because I'm the leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down Expand Up @@ -269,6 +271,36 @@ private[raft] trait Leader { this: RaftActor =>
broadcast(SuspendEntity(shardId, NormalizedEntityId.of(entityPath), stopMessage))
}

private def handleEntityPassivationPermitRequest(request: EntityPassivationPermitRequest): Unit = {
if (isEntityTerminated(request.entityId)) {
// NOTE:
// The leader could permit passivation mistakenly if the entity has been terminated with a failure at this time.
// The leader can recover from this mistake. It will detect the leader's entity termination by death watch and then
// request non-leaders such that the non-leaders entities start.
if (log.isDebugEnabled) {
log.debug(
"[Leader] got a passivation permit request (entityId=[{}], stopMessage=[{}]) from [{}]." +
" Replying with a passivation permit since the leader's entity is terminated.",
request.entityId,
request.stopMessage.getClass.getName,
sender(),
)
}
sender() ! EntityPassivationPermitted(request.entityId, request.stopMessage)
} else {
if (log.isDebugEnabled) {
log.debug(
"[Leader] got a passivation permit request (entityId=[{}], stopMessage=[{}]) from [{}]." +
" Replying with a passivation denial since the leader's entity is running.",
request.entityId,
request.stopMessage.getClass.getName,
sender(),
)
}
sender() ! EntityPassivationDenied(request.entityId)
}
}

private[this] def publishAppendEntries(): Unit = {
resetHeartbeatTimeoutTimer()
otherMemberIndexes.foreach { memberIndex =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lerna.akka.entityreplication.model.{ NormalizedEntityId, NormalizedShardI
import lerna.akka.entityreplication.raft.RaftProtocol.{ Replicate, _ }
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
Expand Down Expand Up @@ -189,6 +189,11 @@ private[raft] class RaftActor(

protected def commitLogStore: ActorRef = _commitLogStore

/** Returns `true` if the given entity is terminated, `false` otherwise. */
protected def isEntityTerminated(entityId: NormalizedEntityId): Boolean = {
context.child(entityId.underlying).isEmpty
}

protected[this] def createEntityIfNotExists(entityId: NormalizedEntityId): Unit = replicationActor(entityId)

protected def receiveFetchEntityEvents(request: FetchEntityEvents): Unit = {
Expand Down Expand Up @@ -396,12 +401,71 @@ private[raft] class RaftActor(
case _ => stash()
}

protected def handlePassivateForNonLeader(passivate: ReplicationRegion.Passivate): Unit = {
currentData.leaderMember match {
case Some(leaderMemberIndex) =>
assert(leaderMemberIndex != selfMemberIndex, s"This RaftActor [$this] should be non-leader.")
val passivationPermitRequest =
EntityPassivationPermitRequest(shardId, NormalizedEntityId.of(passivate.entityPath), passivate.stopMessage)
if (log.isDebugEnabled) {
log.debug(
s"=== [$currentState] sending a passivation request (shardId=[{}], entityId=[{}], stopMessage=[{}]) to the leader [{}]. ===",
passivationPermitRequest.shardId,
passivationPermitRequest.entityId,
passivationPermitRequest.stopMessage.getClass.getName,
leaderMemberIndex,
)
}
region ! ReplicationRegion.DeliverTo(leaderMemberIndex, passivationPermitRequest)
case None =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] dropped a Passivate message (entityPath=[{}], stopMessage=[{}]) since there was no leader." +
" The entity should request passivation later again. ===",
currentState,
passivate.entityPath,
passivate.stopMessage.getClass.getName,
)
}
}
}

protected def handleEntityPassivationPermitResponse(response: EntityPassivationPermitResponse): Unit = {
response match {
case EntityPassivationPermitted(entityId, stopMessage) =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] got a passivation permit (entityId=[{}], stopMessage=[{}]) from the leader [{}]. ===",
currentState,
entityId,
stopMessage.getClass.getName,
sender(),
)
}
suspendEntity(entityId, stopMessage)
case EntityPassivationDenied(entityId) =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] got a passivation denial (entityId=[{}]) from the leader [{}]. ===",
currentState,
entityId,
sender(),
)
}
}
}

def receiveEntityTerminated(entityId: NormalizedEntityId): Unit = {
if (currentData.entityStateOf(entityId).isPassivating) {
applyDomainEvent(TerminatedEntity(entityId)) { _ => }
} else {
// restart
replicationActor(entityId)
if (currentState == Leader) {
// Non-leaders (followers or candidates) entities might be stopped since the leader could permit passivation mistakenly.
// The leader will recover from the mistake by requesting non-leaders such that non-leaders entities start.
broadcastWithoutSelf(TryCreateEntity(shardId, entityId))
}
}
}

Expand Down Expand Up @@ -512,6 +576,12 @@ private[raft] class RaftActor(
region ! ReplicationRegion.Broadcast(message)
}

/** Send the message to the other members */
private def broadcastWithoutSelf(message: Any): Unit = {
if (log.isDebugEnabled) log.debug("=== [{}] broadcast {} without self ===", currentState, message)
region ! ReplicationRegion.BroadcastWithoutSelf(message)
}

def applyToReplicationActor(logEntry: LogEntry): Unit =
logEntry.event match {
case EntityEvent(_, NoOp) => // NoOp は replicationActor には関係ないので転送しない
Expand Down
Loading

0 comments on commit 9a21949

Please sign in to comment.