Skip to content

Commit

Permalink
Merge pull request #160 from lerna-stack/enhance-leaders-replication-…
Browse files Browse the repository at this point in the history
…response-handling

Enhance leader's replication response handling
  • Loading branch information
negokaz authored Jul 27, 2022
2 parents e415a69 + e825da2 commit 757daa9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
[Unreleased]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.1.0...master

### Changed
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)

### Fixed
- RaftActor might delete committed entries [#152](https://github.com/lerna-stack/akka-entity-replication/issues/152)
⚠️ This fix adds a new persistent event type. It doesn't allow downgrading after being updated.
Expand Down
21 changes: 17 additions & 4 deletions src/main/scala/lerna/akka/entityreplication/raft/Leader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,27 @@ private[raft] trait Leader { this: RaftActor =>

}

private[this] def receiveReplicationResponse(event: ReplicationResponse): Unit =
event match {
private[this] def receiveReplicationResponse(replicationResponse: ReplicationResponse): Unit =
replicationResponse match {

case ReplicationSucceeded(NoOp, _, _) =>
// ignore: no-op replication when become leader

case ReplicationSucceeded(unknownEvent, _, _) =>
if (log.isWarningEnabled) log.warning("unknown event: {}", unknownEvent)
case ReplicationSucceeded(unknownEvent, logEntryIndex, instanceId) =>
if (log.isWarningEnabled) {
log.warning(
"[Leader] received the unexpected ReplicationSucceeded message: event type=[{}], index=[{}], instanceId=[{}]",
unknownEvent.getClass.getName,
logEntryIndex,
instanceId.map(_.underlying),
)
}

case ReplicationFailed =>
if (log.isWarningEnabled) {
log.warning("[Leader] received the unexpected ReplicationFailed message")
}

}

private[this] def startEntityPassivationProcess(entityPath: ActorPath, stopMessage: Any): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,11 @@ private[entityreplication] object RaftProtocol {
final case class Replica(logEntry: LogEntry) extends EntityCommand
final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef) extends EntityCommand
final case object RecoveryTimeout extends EntityCommand
final case object ReplicationFailed extends EntityCommand

sealed trait ReplicationResponse

sealed trait ReplicationResponse extends EntityCommand
final case class ReplicationSucceeded(event: Any, logEntryIndex: LogEntryIndex, instanceId: Option[EntityInstanceId])
extends ReplicationResponse
with EntityCommand
final case object ReplicationFailed extends ReplicationResponse

final case class EntityRecoveryTimeoutException(entityPath: ActorPath) extends RuntimeException
}
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,32 @@ class RaftActorLeaderSpec
}
}

"log a warning if it receives a ReplicationSucceeded message containing an event other than NoOp" in {
val leader = createRaftActor()
val leaderData = createLeaderData(Term(1))
setState(leader, Leader, leaderData)

LoggingTestKit
.warn(
"[Leader] received the unexpected ReplicationSucceeded message: event type=[java.lang.String], index=[3], instanceId=[Some(1)]",
).expect {
leader ! ReplicationSucceeded("event-1", LogEntryIndex(3), Option(EntityInstanceId(1)))
}
}

"log a warning if it receives a ReplicationFailed message" in {
val leader = createRaftActor()
val leaderData = createLeaderData(Term(1))
setState(leader, Leader, leaderData)

LoggingTestKit
.warn(
"[Leader] received the unexpected ReplicationFailed message",
).expect {
leader ! ReplicationFailed
}
}

}

private[this] def createLeaderData(
Expand Down

0 comments on commit 757daa9

Please sign in to comment.