Skip to content

Commit

Permalink
Merge pull request #196 from lerna-stack/snapshot-store-diagnostic-lo…
Browse files Browse the repository at this point in the history
…gging

Add diagnostic logs to SnapshotStore
  • Loading branch information
negokaz authored Nov 30, 2022
2 parents 3112d45 + efce77e commit ac4f10f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add diagnostic logs
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164),
[PR#176](https://github.com/lerna-stack/akka-entity-replication/pull/176),
[PR#177](https://github.com/lerna-stack/akka-entity-replication/pull/177)
[PR#177](https://github.com/lerna-stack/akka-entity-replication/pull/177),
[PR#196](https://github.com/lerna-stack/akka-entity-replication/pull/196)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
- Add function of Disabling raft actor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,19 @@ private[entityreplication] class SnapshotStore(

override def snapshotPluginId: String = settings.snapshotStorePluginId

context.setReceiveTimeout(settings.compactionSnapshotCacheTimeToLive)
override def preStart(): Unit = {
if (log.isDebugEnabled) {
log.debug(
"Starting SnapshotStore (typeName=[{}], entityId=[{}], memberIndex=[{}]), which will stop when it is inactive for snapshot-cache-time-to-live period [{}] ms",
typeName,
entityId.raw,
selfMemberIndex,
settings.compactionSnapshotCacheTimeToLive.toMillis,
)
}
super.preStart()
context.setReceiveTimeout(settings.compactionSnapshotCacheTimeToLive)
}

override def receiveRecover: Receive = {
case snapshot: EntitySnapshot => context.become(hasSnapshot(snapshot))
Expand Down Expand Up @@ -93,10 +105,34 @@ private[entityreplication] class SnapshotStore(
// reduce IO: don't save if same as cached snapshot
command.replyTo ! SaveSnapshotSuccess(command.snapshot.metadata)
} else {
if (log.isDebugEnabled) {
log.debug(
"Saving EntitySnapshot: entityId=[{}], logEntryIndex=[{}], stateType=[{}]",
command.snapshot.metadata.entityId.raw,
command.snapshot.metadata.logEntryIndex,
command.snapshot.state.underlying.getClass.getName,
)
}
replyRefCache = Option(command.replyTo)
persistAsync(command.snapshot) { _ =>
if (log.isDebugEnabled) {
log.debug(
"Saved EntitySnapshot: entityId=[{}], logEntryIndex=[{}], stateType=[{}]",
command.snapshot.metadata.entityId.raw,
command.snapshot.metadata.logEntryIndex,
command.snapshot.state.underlying.getClass.getName,
)
}
command.replyTo ! SaveSnapshotSuccess(command.snapshot.metadata)
if (lastSequenceNr % settings.snapshotStoreSnapshotEvery == 0 && lastSequenceNr != 0) {
if (log.isDebugEnabled) {
log.debug(
"Saving EntitySnapshot as a snapshot: entityId=[{}], logEntryIndex=[{}], stateType=[{}]",
command.snapshot.metadata.entityId.raw,
command.snapshot.metadata.logEntryIndex,
command.snapshot.state.underlying.getClass.getName,
)
}
saveSnapshot(command.snapshot)
}
context.become(hasSnapshot(command.snapshot))
Expand Down Expand Up @@ -127,6 +163,14 @@ private[entityreplication] class SnapshotStore(
override def unhandled(message: Any): Unit =
message match {
case ReceiveTimeout =>
if (log.isDebugEnabled) {
log.debug(
"Stopping SnapshotStore (typeName=[{}], entityId=[{}], memberIndex=[{}]) since it is inactive",
typeName,
entityId.raw,
selfMemberIndex,
)
}
context.stop(self)
case _: persistence.SaveSnapshotSuccess =>
if (log.isDebugEnabled) {
Expand Down

0 comments on commit ac4f10f

Please sign in to comment.