Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot synchronization diagnostic logs #177

Merged
merged 4 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
[Unreleased]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.1.0...master

### Added
- Add diagnostic logs
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164),
[PR#177](https://github.com/lerna-stack/akka-entity-replication/pull/177)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)

### Changed
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)
- Change event sourcing log level to debug
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
- ReplicationRegionRaftActorStarter uses its FQCN as its logger name
[PR178](https://github.com/lerna-stack/akka-entity-replication/pull/178)
- Add diagnostic logging to CommitLogStoreActor
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)
- Add diagnostic info to logs of sending replication results
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)

Expand Down
38 changes: 36 additions & 2 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,15 @@ private[raft] class RaftActor(
response match {
case response: SnapshotSyncManager.SyncSnapshotSucceeded =>
applyDomainEvent(SnapshotSyncCompleted(response.snapshotLastLogTerm, response.snapshotLastLogIndex)) { _ =>
if (log.isInfoEnabled) {
log.info(
"[{}] Completed snapshot synchronization: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
currentState,
response.srcMemberIndex,
response.snapshotLastLogTerm.term,
response.snapshotLastLogIndex,
)
}
region ! ReplicationRegion.DeliverTo(
response.srcMemberIndex,
InstallSnapshotSucceeded(
Expand All @@ -754,6 +763,15 @@ private[raft] class RaftActor(
}

case response: SnapshotSyncManager.SyncSnapshotAlreadySucceeded =>
if (log.isInfoEnabled) {
log.info(
"[{}] Completed snapshot synchronization already: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
currentState,
response.srcMemberIndex,
response.snapshotLastLogTerm.term,
response.snapshotLastLogIndex,
)
}
region ! ReplicationRegion.DeliverTo(
response.srcMemberIndex,
InstallSnapshotSucceeded(
Expand All @@ -764,7 +782,11 @@ private[raft] class RaftActor(
),
)

case _: SnapshotSyncManager.SyncSnapshotFailed => // ignore
case _: SnapshotSyncManager.SyncSnapshotFailed =>
// ignore
if (log.isWarningEnabled) {
log.warning("[{}] Failed snapshot synchronization", currentState)
}
}

private val snapshotSyncManagerName: String = ActorIds.actorName(
Expand All @@ -777,7 +799,8 @@ private[raft] class RaftActor(
// Snapshot updates during compaction will break consistency
if (log.isInfoEnabled)
log.info(
"Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
"[{}] Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
currentState,
currentData.snapshottingProgress.inProgressEntities.size,
currentData.snapshottingProgress.inProgressEntities.size + currentData.snapshottingProgress.completedEntities.size,
)
Expand All @@ -796,6 +819,17 @@ private[raft] class RaftActor(
snapshotSyncManagerName,
)
}
if (log.isDebugEnabled) {
log.debug(
s"[${currentState}] Starting snapshot synchronization " +
"(srcLatestSnapshotLastLogTerm=[{}], srcLatestSnapshotLastLogIndex=[{}], " +
"dstLatestSnapshotLastLogTerm=[{}], dstLatestSnapshotLastLogIndex=[{}])",
installSnapshot.srcLatestSnapshotLastLogTerm.term,
installSnapshot.srcLatestSnapshotLastLogLogIndex,
currentData.lastSnapshotStatus.snapshotLastTerm.term,
currentData.lastSnapshotStatus.snapshotLastLogIndex,
)
}
snapshotSyncManager ! SnapshotSyncManager.SyncSnapshot(
srcLatestSnapshotLastLogTerm = installSnapshot.srcLatestSnapshotLastLogTerm,
srcLatestSnapshotLastLogIndex = installSnapshot.srcLatestSnapshotLastLogLogIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lerna.akka.entityreplication.raft.snapshot.sync
import akka.actor.{ ActorLogging, ActorRef, Props, Status }
import akka.pattern.extended.ask
import akka.pattern.pipe
import akka.persistence.{ PersistentActor, RuntimePluginConfig, SnapshotOffer }
import akka.persistence.{ PersistentActor, RecoveryCompleted, RuntimePluginConfig, SnapshotOffer }
import akka.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
import akka.stream.{ KillSwitches, UniqueKillSwitch }
Expand Down Expand Up @@ -222,10 +222,18 @@ private[entityreplication] class SnapshotSyncManager(

override def receiveRecover: Receive = {

case SnapshotOffer(_, snapshot: SyncProgress) =>
case SnapshotOffer(metadata, snapshot: SyncProgress) =>
if (log.isInfoEnabled) {
log.info("Loaded snapshot: metadata=[{}], snapshot=[{}]", metadata, snapshot)
}
this.state = snapshot

case event: Event => updateState(event)

case RecoveryCompleted =>
if (log.isInfoEnabled) {
log.info("Recovery completed: state=[{}]", this.state)
}
}

private[this] var state = SyncProgress(Offset.noOffset)
Expand Down Expand Up @@ -286,10 +294,16 @@ private[entityreplication] class SnapshotSyncManager(
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
)

case _: akka.persistence.SaveSnapshotSuccess =>
case akka.persistence.SaveSnapshotSuccess(metadata) =>
if (log.isInfoEnabled) {
log.info("Succeeded to save snapshot synchronization progress: metadata=[{}]", metadata)
}
context.stop(self)

case _: akka.persistence.SaveSnapshotFailure =>
case akka.persistence.SaveSnapshotFailure(metadata, cause) =>
if (log.isWarningEnabled) {
log.warning("Failed to save snapshot synchronization progress: metadata=[{}], cause=[{}]", metadata, cause)
}
context.stop(self)
}

Expand All @@ -300,7 +314,10 @@ private[entityreplication] class SnapshotSyncManager(
dstLatestSnapshotLastLogIndex: LogEntryIndex,
): Receive = {

case _: SyncSnapshot => // ignore
case syncSnapshot: SyncSnapshot =>
if (log.isDebugEnabled) {
log.debug("Dropping [{}] since the snapshot synchronization is running.", syncSnapshot)
}

case syncStatus: SyncStatus =>
this.killSwitch = None
Expand All @@ -318,6 +335,13 @@ private[entityreplication] class SnapshotSyncManager(
updateState(event)
if (event.snapshotLastLogIndex < srcLatestSnapshotLastLogIndex) {
// complete partially
if (log.isDebugEnabled) {
log.debug(
"Snapshot synchronization partially completed and continues: {} -> {}",
s"(typeName: $typeName, memberIndex: $srcMemberIndex, snapshotLastLogIndex: ${event.snapshotLastLogIndex}/${srcLatestSnapshotLastLogIndex})",
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
)
}
startSnapshotSynchronizationBatch(
srcLatestSnapshotLastLogIndex,
dstLatestSnapshotLastLogTerm,
Expand Down Expand Up @@ -374,8 +398,17 @@ private[entityreplication] class SnapshotSyncManager(
)
context.stop(self)

case _: akka.persistence.SaveSnapshotSuccess => // ignore: previous execution result
case _: akka.persistence.SaveSnapshotFailure => // ignore: previous execution result
case saveSnapshotSuccess: akka.persistence.SaveSnapshotSuccess =>
// ignore: previous execution result
if (log.isDebugEnabled) {
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotSuccess)
}

case saveSnapshotFailure: akka.persistence.SaveSnapshotFailure =>
// ignore: previous execution result
if (log.isDebugEnabled) {
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotFailure)
}
}

def updateState(event: Event): Unit =
Expand Down