Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non-leaders passivate entities eventually #200

Merged
merged 6 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
[Unreleased]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.2.0...master

### Fixed
- Dead letters about ReplicationRegion$Passivate continue
[#199](https://github.com/lerna-stack/akka-entity-replication/issues/199),
[PR#200](https://github.com/lerna-stack/akka-entity-replication/pull/200)


## [v2.2.0] - 2022-012-26
[v2.2.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.1.0...v2.2.0
Expand Down
15 changes: 15 additions & 0 deletions core/src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ message TryCreateEntity {
required NormalizedEntityId entity_id = 2;
}

message EntityPassivationPermitRequest {
required NormalizedShardId shard_id = 1;
required NormalizedEntityId entity_id = 2;
required Payload stop_message = 3;
}

message EntityPassivationPermitted {
required NormalizedEntityId entity_id = 1;
required Payload stop_message = 2;
}

message EntityPassivationDenied {
required NormalizedEntityId entity_id = 1;
}


// ===
// raft.snapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
val CommitLogStoreAppendCommittedEntriesManifest = "BD"
val CommitLogStoreAppendCommittedEntriesResponseManifest = "BE"
// raft.protocol
val RequestVoteManifest = "CA"
val RequestVoteAcceptedManifest = "CB"
val RequestVoteDeniedManifest = "CC"
val AppendEntriesManifest = "CD"
val AppendEntriesSucceededManifest = "CE"
val AppendEntriesFailedManifest = "CF"
val InstallSnapshotManifest = "CG"
val InstallSnapshotSucceededManifest = "CH"
val SuspendEntityManifest = "CI"
val TryCreateEntityManifest = "CJ"
val RequestVoteManifest = "CA"
val RequestVoteAcceptedManifest = "CB"
val RequestVoteDeniedManifest = "CC"
val AppendEntriesManifest = "CD"
val AppendEntriesSucceededManifest = "CE"
val AppendEntriesFailedManifest = "CF"
val InstallSnapshotManifest = "CG"
val InstallSnapshotSucceededManifest = "CH"
val SuspendEntityManifest = "CI"
val TryCreateEntityManifest = "CJ"
val EntityPassivationPermitRequestManifest = "CK"
val EntityPassivationPermittedManifest = "CL"
val EntityPassivationDeniedManifest = "CM"
// raft.snapshot
val EntitySnapshotManifest = "DA"
// raft.snapshot.sync
Expand Down Expand Up @@ -85,16 +88,19 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
CommitLogStoreAppendCommittedEntriesManifest -> commitLogStoreAppendCommittedEntriesFromBinary,
CommitLogStoreAppendCommittedEntriesResponseManifest -> commitLogStoreAppendCommittedEntriesResponseFromBinary,
// raft.protocol
RequestVoteManifest -> requestVoteFromBinary,
RequestVoteAcceptedManifest -> requestVoteAcceptedFromBinary,
RequestVoteDeniedManifest -> requestVoteDeniedFromBinary,
AppendEntriesManifest -> appendEntriesFromBinary,
AppendEntriesSucceededManifest -> appendEntriesSucceededFromBinary,
AppendEntriesFailedManifest -> appendEntriesFailedFromBinary,
InstallSnapshotManifest -> installSnapshotFromBinary,
InstallSnapshotSucceededManifest -> installSnapshotSucceededFromBinary,
SuspendEntityManifest -> suspendEntityFromBinary,
TryCreateEntityManifest -> tryCreateEntityFromBinary,
RequestVoteManifest -> requestVoteFromBinary,
RequestVoteAcceptedManifest -> requestVoteAcceptedFromBinary,
RequestVoteDeniedManifest -> requestVoteDeniedFromBinary,
AppendEntriesManifest -> appendEntriesFromBinary,
AppendEntriesSucceededManifest -> appendEntriesSucceededFromBinary,
AppendEntriesFailedManifest -> appendEntriesFailedFromBinary,
InstallSnapshotManifest -> installSnapshotFromBinary,
InstallSnapshotSucceededManifest -> installSnapshotSucceededFromBinary,
SuspendEntityManifest -> suspendEntityFromBinary,
TryCreateEntityManifest -> tryCreateEntityFromBinary,
EntityPassivationPermitRequestManifest -> entityPassivationPermitRequestFromBinary,
EntityPassivationPermittedManifest -> entityPassivationPermittedFromBinary,
EntityPassivationDeniedManifest -> entityPassivationDeniedFromBinary,
// raft.snapshot
EntitySnapshotManifest -> entitySnapshotFromBinary,
// raft.snapshot.sync
Expand Down Expand Up @@ -171,6 +177,9 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: raft.protocol.RaftCommands.InstallSnapshotSucceeded => InstallSnapshotSucceededManifest
case _: raft.protocol.SuspendEntity => SuspendEntityManifest
case _: raft.protocol.TryCreateEntity => TryCreateEntityManifest
case _: raft.protocol.EntityPassivationPermitRequest => EntityPassivationPermitRequestManifest
case _: raft.protocol.EntityPassivationPermitted => EntityPassivationPermittedManifest
case _: raft.protocol.EntityPassivationDenied => EntityPassivationDeniedManifest
// raft.snapsnot
case _: raft.snapshot.SnapshotProtocol.EntitySnapshot => EntitySnapshotManifest
// raft.snapshot.sync
Expand Down Expand Up @@ -219,6 +228,9 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: raft.protocol.RaftCommands.InstallSnapshotSucceeded => installSnapshotSucceededToBinary(m)
case m: raft.protocol.SuspendEntity => suspendEntityToBinary(m)
case m: raft.protocol.TryCreateEntity => tryCreateEntityToBinary(m)
case m: raft.protocol.EntityPassivationPermitRequest => entityPassivationPermitRequestToBinary(m)
case m: raft.protocol.EntityPassivationPermitted => entityPassivationPermittedToBinary(m)
case m: raft.protocol.EntityPassivationDenied => entityPassivationDeniedToBinary(m)
// raft.snapshot
case m: raft.snapshot.SnapshotProtocol.EntitySnapshot => entitySnapShotToBinary(m)
// raft.snapshot.sync
Expand Down Expand Up @@ -728,6 +740,66 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
)
}

private def entityPassivationPermitRequestToBinary(
message: raft.protocol.EntityPassivationPermitRequest,
): Array[Byte] = {
msg.EntityPassivationPermitRequest
.of(
shardId = normalizedShardIdToProto(message.shardId),
entityId = normalizedEntityIdToProto(message.entityId),
stopMessage = payloadToProto(message.stopMessage),
).toByteArray
}

private def entityPassivationPermitRequestFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationPermitRequest = {
val proto = msg.EntityPassivationPermitRequest.parseFrom(bytes)
raft.protocol.EntityPassivationPermitRequest(
shardId = normalizedShardIdFromProto(proto.shardId),
entityId = normalizedEntityIdFromProto(proto.entityId),
stopMessage = payloadFromProto(proto.stopMessage),
)
}

private def entityPassivationPermittedToBinary(
message: raft.protocol.EntityPassivationPermitted,
): Array[Byte] = {
msg.EntityPassivationPermitted
.of(
entityId = normalizedEntityIdToProto(message.entityId),
stopMessage = payloadToProto(message.stopMessage),
).toByteArray
}

private def entityPassivationPermittedFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationPermitted = {
val proto = msg.EntityPassivationPermitted.parseFrom(bytes)
raft.protocol.EntityPassivationPermitted(
entityId = normalizedEntityIdFromProto(proto.entityId),
stopMessage = payloadFromProto(proto.stopMessage),
)
}

private def entityPassivationDeniedToBinary(
message: raft.protocol.EntityPassivationDenied,
): Array[Byte] = {
msg.EntityPassivationDenied
.of(
entityId = normalizedEntityIdToProto(message.entityId),
).toByteArray
}

private def entityPassivationDeniedFromBinary(
bytes: Array[Byte],
): raft.protocol.EntityPassivationDenied = {
val proto = msg.EntityPassivationDenied.parseFrom(bytes)
raft.protocol.EntityPassivationDenied(
entityId = normalizedEntityIdFromProto(proto.entityId),
)
}

// ===
// raft.snapshot
// ===
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.ReplicationRegion
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager

Expand All @@ -21,6 +22,9 @@ private[raft] trait Candidate { this: RaftActor =>
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case replicate: Replicate => receiveReplicate(replicate)
case passivate: ReplicationRegion.Passivate => handlePassivateForNonLeader(passivate)
case _: EntityPassivationPermitRequest => // ignore, because I'm not a leader
case response: EntityPassivationPermitResponse => handleEntityPassivationPermitResponse(response)
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.ReplicationRegion
Expand All @@ -21,6 +21,9 @@ private[raft] trait Follower { this: RaftActor =>
case command: Command => handleCommand(command)
case _: ForwardedCommand => // ignore, because I'm not a leader
case replicate: Replicate => receiveReplicate(replicate)
case passivate: ReplicationRegion.Passivate => handlePassivateForNonLeader(passivate)
case _: EntityPassivationPermitRequest => // ignore, because I'm not a leader
case response: EntityPassivationPermitResponse => handleEntityPassivationPermitResponse(response)
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import lerna.akka.entityreplication.model.NormalizedEntityId
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, SuspendEntity, TryCreateEntity }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.ReplicationRegion
Expand All @@ -31,6 +31,8 @@ private[raft] trait Leader { this: RaftActor =>
case request: Replicate => replicate(request)
case response: ReplicationResponse => receiveReplicationResponse(response)
case ReplicationRegion.Passivate(entityPath, stopMessage) => startEntityPassivationProcess(entityPath, stopMessage)
case request: EntityPassivationPermitRequest => handleEntityPassivationPermitRequest(request)
case _: EntityPassivationPermitResponse => // ignore, because I'm the leader
case TryCreateEntity(_, entityId) => createEntityIfNotExists(entityId)
case request: FetchEntityEvents => receiveFetchEntityEvents(request)
case EntityTerminated(id) => receiveEntityTerminated(id)
Expand Down Expand Up @@ -269,6 +271,36 @@ private[raft] trait Leader { this: RaftActor =>
broadcast(SuspendEntity(shardId, NormalizedEntityId.of(entityPath), stopMessage))
}

private def handleEntityPassivationPermitRequest(request: EntityPassivationPermitRequest): Unit = {
if (isEntityTerminated(request.entityId)) {
// NOTE:
// The leader could permit passivation mistakenly if the entity has been terminated with a failure at this time.
// The leader can recover from this mistake. It will detect the leader's entity termination by death watch and then
// request non-leaders such that the non-leaders entities start.
if (log.isDebugEnabled) {
log.debug(
"[Leader] got a passivation permit request (entityId=[{}], stopMessage=[{}]) from [{}]." +
" Replying with a passivation permit since the leader's entity is terminated.",
request.entityId,
request.stopMessage.getClass.getName,
sender(),
)
}
sender() ! EntityPassivationPermitted(request.entityId, request.stopMessage)
} else {
if (log.isDebugEnabled) {
log.debug(
"[Leader] got a passivation permit request (entityId=[{}], stopMessage=[{}]) from [{}]." +
" Replying with a passivation denial since the leader's entity is running.",
request.entityId,
request.stopMessage.getClass.getName,
sender(),
)
}
sender() ! EntityPassivationDenied(request.entityId)
}
}

private[this] def publishAppendEntries(): Unit = {
resetHeartbeatTimeoutTimer()
otherMemberIndexes.foreach { memberIndex =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lerna.akka.entityreplication.model.{ NormalizedEntityId, NormalizedShardI
import lerna.akka.entityreplication.raft.RaftProtocol.{ Replicate, _ }
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.protocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
Expand Down Expand Up @@ -189,6 +189,11 @@ private[raft] class RaftActor(

protected def commitLogStore: ActorRef = _commitLogStore

/** Returns `true` if the given entity is terminated, `false` otherwise. */
protected def isEntityTerminated(entityId: NormalizedEntityId): Boolean = {
context.child(entityId.underlying).isEmpty
}

protected[this] def createEntityIfNotExists(entityId: NormalizedEntityId): Unit = replicationActor(entityId)

protected def receiveFetchEntityEvents(request: FetchEntityEvents): Unit = {
Expand Down Expand Up @@ -396,12 +401,71 @@ private[raft] class RaftActor(
case _ => stash()
}

protected def handlePassivateForNonLeader(passivate: ReplicationRegion.Passivate): Unit = {
currentData.leaderMember match {
case Some(leaderMemberIndex) =>
assert(leaderMemberIndex != selfMemberIndex, s"This RaftActor [$this] should be non-leader.")
val passivationPermitRequest =
EntityPassivationPermitRequest(shardId, NormalizedEntityId.of(passivate.entityPath), passivate.stopMessage)
if (log.isDebugEnabled) {
log.debug(
s"=== [$currentState] sending a passivation request (shardId=[{}], entityId=[{}], stopMessage=[{}]) to the leader [{}]. ===",
passivationPermitRequest.shardId,
passivationPermitRequest.entityId,
passivationPermitRequest.stopMessage.getClass.getName,
leaderMemberIndex,
)
}
region ! ReplicationRegion.DeliverTo(leaderMemberIndex, passivationPermitRequest)
case None =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] dropped a Passivate message (entityPath=[{}], stopMessage=[{}]) since there was no leader." +
" The entity should request passivation later again. ===",
currentState,
passivate.entityPath,
passivate.stopMessage.getClass.getName,
)
}
}
}

protected def handleEntityPassivationPermitResponse(response: EntityPassivationPermitResponse): Unit = {
response match {
case EntityPassivationPermitted(entityId, stopMessage) =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] got a passivation permit (entityId=[{}], stopMessage=[{}]) from the leader [{}]. ===",
currentState,
entityId,
stopMessage.getClass.getName,
sender(),
)
}
suspendEntity(entityId, stopMessage)
case EntityPassivationDenied(entityId) =>
if (log.isDebugEnabled) {
log.debug(
"=== [{}] got a passivation denial (entityId=[{}]) from the leader [{}]. ===",
currentState,
entityId,
sender(),
)
}
}
}

def receiveEntityTerminated(entityId: NormalizedEntityId): Unit = {
if (currentData.entityStateOf(entityId).isPassivating) {
applyDomainEvent(TerminatedEntity(entityId)) { _ => }
} else {
// restart
replicationActor(entityId)
if (currentState == Leader) {
// Non-leaders (followers or candidates) entities might be stopped since the leader could permit passivation mistakenly.
// The leader will recover from the mistake by requesting non-leaders such that non-leaders entities start.
broadcastWithoutSelf(TryCreateEntity(shardId, entityId))
}
}
}

Expand Down Expand Up @@ -512,6 +576,12 @@ private[raft] class RaftActor(
region ! ReplicationRegion.Broadcast(message)
}

/** Send the message to the other members */
private def broadcastWithoutSelf(message: Any): Unit = {
if (log.isDebugEnabled) log.debug("=== [{}] broadcast {} without self ===", currentState, message)
region ! ReplicationRegion.BroadcastWithoutSelf(message)
}

def applyToReplicationActor(logEntry: LogEntry): Unit =
logEntry.event match {
case EntityEvent(_, NoOp) => // NoOp は replicationActor には関係ないので転送しない
Expand Down
Loading