Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance leader's replication response handling #160

Merged
merged 9 commits into from
Jul 27, 2022
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
[Unreleased]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.1.0...master

### Changed
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)

### Fixed
- RaftActor might delete committed entries [#152](https://github.com/lerna-stack/akka-entity-replication/issues/152)
⚠️ This fix adds a new persistent event type. It doesn't allow downgrading after being updated.
Expand Down
21 changes: 17 additions & 4 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,27 @@ private[raft] trait Leader { this: RaftActor =>

}

private[this] def receiveReplicationResponse(event: ReplicationResponse): Unit =
event match {
private[this] def receiveReplicationResponse(replicationResponse: ReplicationResponse): Unit =
replicationResponse match {

case ReplicationSucceeded(NoOp, _, _) =>
// ignore: no-op replication when become leader

case ReplicationSucceeded(unknownEvent, _, _) =>
if (log.isWarningEnabled) log.warning("unknown event: {}", unknownEvent)
case ReplicationSucceeded(unknownEvent, logEntryIndex, instanceId) =>
if (log.isWarningEnabled) {
log.warning(
"[Leader] received the unexpected ReplicationSucceeded message: event type=[{}], index=[{}], instanceId=[{}]",
unknownEvent.getClass.getName,
logEntryIndex,
instanceId.map(_.underlying),
)
}

case ReplicationFailed =>
if (log.isWarningEnabled) {
log.warning("[Leader] received the unexpected ReplicationFailed message")
}

}

private[this] def startEntityPassivationProcess(entityPath: ActorPath, stopMessage: Any): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,11 @@ private[entityreplication] object RaftProtocol {
final case class Replica(logEntry: LogEntry) extends EntityCommand
final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef) extends EntityCommand
final case object RecoveryTimeout extends EntityCommand
final case object ReplicationFailed extends EntityCommand

sealed trait ReplicationResponse

sealed trait ReplicationResponse extends EntityCommand
final case class ReplicationSucceeded(event: Any, logEntryIndex: LogEntryIndex, instanceId: Option[EntityInstanceId])
extends ReplicationResponse
with EntityCommand
final case object ReplicationFailed extends ReplicationResponse

final case class EntityRecoveryTimeoutException(entityPath: ActorPath) extends RuntimeException
}
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,32 @@ class RaftActorLeaderSpec
}
}

"log a warning if it receives a ReplicationSucceeded message containing an event other than NoOp" in {
val leader = createRaftActor()
val leaderData = createLeaderData(Term(1))
setState(leader, Leader, leaderData)

LoggingTestKit
.warn(
"[Leader] received the unexpected ReplicationSucceeded message: event type=[java.lang.String], index=[3], instanceId=[Some(1)]",
).expect {
leader ! ReplicationSucceeded("event-1", LogEntryIndex(3), Option(EntityInstanceId(1)))
}
}

"log a warning if it receives a ReplicationFailed message" in {
val leader = createRaftActor()
val leaderData = createLeaderData(Term(1))
setState(leader, Leader, leaderData)

LoggingTestKit
.warn(
"[Leader] received the unexpected ReplicationFailed message",
).expect {
leader ! ReplicationFailed
}
}

}

private[this] def createLeaderData(
Expand Down