Skip to content

Commit

Permalink
Merge pull request #179 from lerna-stack/revise-logs-of-sending-repli…
Browse files Browse the repository at this point in the history
…cation-results

Add diagnostic info to logs of sending replication results
  • Loading branch information
negokaz authored Nov 7, 2022
2 parents c18e940 + 4321e1c commit 7629647
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)
- Add diagnostic info to logs of sending replication results
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)

### Fixed
- RaftActor might delete committed entries
Expand Down
35 changes: 29 additions & 6 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,15 @@ private[raft] class RaftActor(
.handleCommittedLogEntriesAndClients { entries =>
entries.foreach {
case (logEntry, Some(client)) =>
if (log.isDebugEnabled)
log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
if (log.isDebugEnabled) {
log.debug(
s"=== [${currentState}] Committed event" +
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
s" entityId=[${logEntry.event.entityId.map(_.raw)}]," +
s" type=[${logEntry.event.event.getClass.getName}])" +
s" and sending ReplicationSucceeded to [$client] ===",
)
}
client.forward(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId))
case (logEntry, None) =>
// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
Expand Down Expand Up @@ -494,11 +501,27 @@ private[raft] class RaftActor(
logEntry.event match {
case EntityEvent(_, NoOp) => // NoOp は replicationActor には関係ないので転送しない
case EntityEvent(Some(entityId), event) =>
if (log.isDebugEnabled) log.debug("=== [{}] applying {} to ReplicationActor ===", currentState, event)
replicationActor(entityId) ! Replica(logEntry)
val targetReplicationActor = replicationActor(entityId)
if (log.isDebugEnabled) {
log.debug(
s"=== [${currentState}] Sending Replica to apply event" +
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
s" entityId=[${entityId.raw}]," +
s" eventType=[${event.getClass.getName}])" +
s" to ReplicationActor [$targetReplicationActor] ===",
)
}
targetReplicationActor ! Replica(logEntry)
case EntityEvent(None, event) =>
if (log.isWarningEnabled)
log.warning("=== [{}] {} was not applied, because it is not assigned any entity ===", currentState, event)
if (log.isWarningEnabled) {
log.warning(
"[{}] event [index=[{}], term=[{}], type={}] was not applied, because it is not assigned any entity",
currentState,
logEntry.index,
logEntry.term.term,
event.getClass.getName,
)
}
}

def handleSnapshotTick(): Unit = {
Expand Down

0 comments on commit 7629647

Please sign in to comment.