Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add entity diagnostic logs #176

Merged
merged 6 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Add diagnostic logs
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164),
[PR#176](https://github.com/lerna-stack/akka-entity-replication/pull/176),
[PR#177](https://github.com/lerna-stack/akka-entity-replication/pull/177)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,46 @@ private[entityreplication] class Inactive[Command, Event, State](
protected val setup: BehaviorSetup[Command, Event, State],
) extends ReplicationOperations[Command, Event, State] {

override def stateName: String = "Inactive"

def createBehavior(): Behavior[EntityCommand] =
Behaviors
.receiveMessage[EntityCommand] {
case command: RaftProtocol.Activate =>
receiveActivate(command)
case command: RaftProtocol.ProcessCommand =>
if (setup.context.log.isTraceEnabled) {
setup.context.log.trace(
"[{}] Stashing ProcessCommand: commandType=[{}]",
stateName,
command.command.getClass.getName,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case command: RaftProtocol.Replica =>
if (setup.context.log.isTraceEnabled) {
setup.context.log.trace(
"[{}] Stashing Replica: index=[{}], term=[{}], entityId=[{}], eventType=[{}]",
stateName,
command.logEntry.index,
command.logEntry.term.term,
command.logEntry.event.entityId.map(_.raw),
command.logEntry.event.event.getClass.getName,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case command: RaftProtocol.TakeSnapshot =>
if (setup.context.log.isTraceEnabled) {
setup.context.log.trace(
"[{}] Stashing TakeSnapshot: index=[{}], entityId=[{}], replyTo=[{}]",
stateName,
command.metadata.logEntryIndex,
command.metadata.entityId.raw,
command.replyTo,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case _: RaftProtocol.ApplySnapshot => Behaviors.unhandled
Expand All @@ -41,6 +69,14 @@ private[entityreplication] class Inactive[Command, Event, State](
}.receiveSignal(setup.onSignal(setup.emptyState))

def receiveActivate(command: RaftProtocol.Activate): Behavior[EntityCommand] = {
if (setup.context.log.isTraceEnabled) {
setup.context.log.trace(
"[{}] Received Activate: recoveryIndex=[{}], shardSnapshotStore=[{}]",
stateName,
command.recoveryIndex,
command.shardSnapshotStore,
)
}
Recovering.behavior(setup, command.shardSnapshotStore, command.recoveryIndex)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[entityreplication] class Ready[Command, Event, State](

import Ready._

override def stateName: String = "Ready"

private[this] type BehaviorState = ReadyState[State]

def createBehavior(readyState: BehaviorState): Behavior[EntityCommand] =
Expand Down Expand Up @@ -90,6 +92,16 @@ private[entityreplication] class Ready[Command, Event, State](
}

def receiveReplica(command: RaftProtocol.Replica, state: BehaviorState): Behavior[EntityCommand] = {
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Received Replica: index=[{}], term=[{}], entityId=[{}], eventType=[{}]",
stateName,
command.logEntry.index,
command.logEntry.term.term,
command.logEntry.event.entityId.map(_.raw),
command.logEntry.event.event.getClass.getName,
)
}
val logEntry = command.logEntry
createBehavior(state.applyEvent(setup, logEntry.event.event, logEntry.index))
}
Expand All @@ -116,6 +128,17 @@ private[entityreplication] class Ready[Command, Event, State](
state: BehaviorState,
effect: EffectImpl[Event, State],
): Behavior[EntityCommand] = {
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Sending Replicate: entityId=[{}], instanceId=[{}], eventType=[{}], replyTo=[{}], to=[{}]",
stateName,
setup.entityContext.entityId,
setup.instanceId.underlying,
event.getClass.getName,
context.self.toClassic,
setup.shard,
)
}
setup.shard ! RaftProtocol.Replicate(
event = event,
replyTo = context.self.toClassic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import lerna.akka.entityreplication.raft.RaftProtocol
import lerna.akka.entityreplication.raft.RaftProtocol.EntityCommand
import lerna.akka.entityreplication.raft.model.LogEntryIndex
import lerna.akka.entityreplication.raft.model.{ LogEntry, LogEntryIndex }
import lerna.akka.entityreplication.raft.protocol.{ FetchEntityEvents, FetchEntityEventsResponse }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshot
Expand All @@ -29,6 +29,8 @@ private[entityreplication] class Recovering[Command, Event, State](

import Recovering._

override def stateName: String = "Recovering"

def createBehavior(
shardSnapshotStore: ActorRef[SnapshotProtocol.Command],
recoveryIndex: LogEntryIndex,
Expand All @@ -45,12 +47,27 @@ private[entityreplication] class Recovering[Command, Event, State](
case FetchEntityEventsResponse(events) => RaftProtocol.RecoveryState(events, snapshot)
}

if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Sending FetchSnapshot: entityId=[{}], to=[{}]",
stateName,
setup.replicationId.entityId.raw,
shardSnapshotStore,
)
}
shardSnapshotStore ! SnapshotProtocol.FetchSnapshot(
setup.replicationId.entityId,
replyTo = fetchSnapshotResponseMapper.toClassic,
)

Behaviors.withTimers { scheduler =>
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Starting single RecoveryTimeoutTimer: delay=[{}]",
stateName,
setup.settings.recoveryEntityTimeout,
)
}
scheduler.startSingleTimer(
RecoveryTimeoutTimer,
RaftProtocol.RecoveryTimeout,
Expand All @@ -59,35 +76,103 @@ private[entityreplication] class Recovering[Command, Event, State](
Behaviors
.receiveMessage[EntityCommand] {
case command: RaftProtocol.ApplySnapshot =>
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Received ApplySnapshot: index=[{}], entityId=[{}], stateType=[{}]",
stateName,
command.entitySnapshot.map(_.metadata.logEntryIndex),
command.entitySnapshot.map(_.metadata.entityId.raw),
command.entitySnapshot.map(_.state.underlying.getClass.getName),
)
}
val snapshotIndex = command.entitySnapshot match {
case Some(snapshot) => snapshot.metadata.logEntryIndex
case None => LogEntryIndex.initial()
}
val replyTo =
fetchEntityEventsResponseMapper(command.entitySnapshot)
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Sending FetchEntityEvents: entityId=[{}], fromIndex=[{}], toIndex=[{}], replyTo=[{}], to=[{}]",
stateName,
setup.replicationId.entityId.raw,
snapshotIndex.next(),
recoveryIndex,
replyTo,
setup.shard,
)
}
setup.shard ! FetchEntityEvents(
setup.replicationId.entityId,
from = snapshotIndex.next(),
to = recoveryIndex,
fetchEntityEventsResponseMapper(command.entitySnapshot),
replyTo,
)
Behaviors.same
case command: RaftProtocol.RecoveryState =>
if (context.log.isTraceEnabled) {
def toLogMessage(logEntry: LogEntry): String = {
val entityId = logEntry.event.entityId.map(_.raw)
val eventType = logEntry.event.event.getClass.getName
s"index=${logEntry.index}, term=${logEntry.term.term}, entityId=$entityId, eventType=$eventType"
}
context.log.trace(
"[{}] Received RecoveryState: " +
"snapshot.index=[{}], snapshot.entityId=[{}], snapshot.stateType=[{}], " +
"events.size=[{}], events.head=[{}], events.last=[{}]",
stateName,
command.snapshot.map(_.metadata.logEntryIndex),
command.snapshot.map(_.metadata.entityId.raw),
command.snapshot.map(_.state.underlying.getClass),
command.events.size,
command.events.headOption.map(toLogMessage),
command.events.lastOption.map(toLogMessage),
)
}
scheduler.cancel(RecoveryTimeoutTimer)
receiveRecoveryState(command)
case RaftProtocol.RecoveryTimeout =>
if (context.log.isInfoEnabled)
context.log.info(
"Entity (name: {}) recovering timed out. It will be retried later.",
"[{}] Entity (name: [{}]) recovering timed out. It will be retried later.",
stateName,
setup.entityContext.entityId,
)
// TODO: Enable backoff to prevent cascade failures
throw RaftProtocol.EntityRecoveryTimeoutException(context.self.path)
case command: RaftProtocol.ProcessCommand =>
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Stashing ProcessCommand: commandType=[{}]",
stateName,
command.command.getClass.getName,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case command: RaftProtocol.Replica =>
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Stashing Replica: index=[{}], term=[{}], entityId=[{}], eventType=[{}]",
stateName,
command.logEntry.index,
command.logEntry.term.term,
command.logEntry.event.entityId.map(_.raw),
command.logEntry.event.event.getClass.getName,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case command: RaftProtocol.TakeSnapshot =>
if (context.log.isTraceEnabled) {
setup.context.log.trace(
"[{}] Stashing TakeSnapshot: index=[{}], entityId=[{}], replyTo=[{}]",
stateName,
command.metadata.logEntryIndex,
command.metadata.entityId.raw,
command.replyTo,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case _: RaftProtocol.Activate => Behaviors.unhandled
Expand All @@ -107,10 +192,26 @@ private[entityreplication] class Recovering[Command, Event, State](
}
val snapshotAppliedState =
ReadyState(entityState, lastAppliedLogIndex)
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Recovering with initial state: index=[{}], stateType=[{}]",
stateName,
snapshotAppliedState.lastAppliedLogEntryIndex,
snapshotAppliedState.entityState.getClass.getName,
)
}
val eventAppliedState =
command.events.foldLeft(snapshotAppliedState)((state, entry) =>
state.applyEvent(setup, entry.event.event, entry.index),
)
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Recovered with state: index=[{}], stateType=[{}]",
stateName,
eventAppliedState.lastAppliedLogEntryIndex,
eventAppliedState.entityState.getClass.getName,
)
}
Ready.behavior(setup, eventAppliedState)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,32 @@ private[entityreplication] trait ReplicationOperations[Command, Event, State] {

protected def context: ActorContext[EntityCommand] = setup.context

/* The state name of a Behavior that executes replication operations.
* This state name is only for diagnostic logging.
*/
protected def stateName: String

def receiveTakeSnapshot(command: TakeSnapshot, entityState: State): Behavior[EntityCommand] = {
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Received TakeSnapshot: index=[{}], entityId=[{}], replyTo=[{}]",
stateName,
command.metadata.logEntryIndex,
command.metadata.entityId.raw,
command.replyTo,
)
}
val TakeSnapshot(metadata, replyTo) = command
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Sending Snapshot: index=[{}], entityId=[{}], stateType=[{}], to=[{}]",
stateName,
command.metadata.logEntryIndex,
command.metadata.entityId.raw,
entityState.getClass.getName,
replyTo,
)
}
replyTo ! Snapshot(metadata, EntityState(entityState))
Behaviors.same
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,29 @@ private[entityreplication] class WaitForReplication[Command, Event, State](

import WaitForReplication._

override def stateName: String = "WaitForReplication"

private[this] type BehaviorState = WaitForReplicationState[State]

def createBehavior(state: BehaviorState): Behavior[EntityCommand] =
Behaviors
.receiveMessage[EntityCommand] {
case command: RaftProtocol.Replica => receiveReplica(command, state)
case command: RaftProtocol.ReplicationSucceeded => receiveReplicationSucceeded(command, state)
case RaftProtocol.ReplicationFailed => Ready.behavior(setup, transformReadyState(state)) // Discard side effects
case command: RaftProtocol.TakeSnapshot => receiveTakeSnapshot(command, state.entityState)
case RaftProtocol.ReplicationFailed =>
if (context.log.isTraceEnabled) {
context.log.trace("[{}] Received ReplicationFailed", stateName)
}
Ready.behavior(setup, transformReadyState(state)) // Discard side effects
case command: RaftProtocol.TakeSnapshot => receiveTakeSnapshot(command, state.entityState)
case command: RaftProtocol.ProcessCommand =>
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Stashing ProcessCommand: commandType=[{}]",
stateName,
command.command.getClass.getName,
)
}
setup.stashBuffer.stash(command)
Behaviors.same
case _: RaftProtocol.Activate => Behaviors.unhandled
Expand All @@ -51,6 +64,16 @@ private[entityreplication] class WaitForReplication[Command, Event, State](
}.receiveSignal(setup.onSignal(state.entityState))

private[this] def receiveReplica(command: RaftProtocol.Replica, state: BehaviorState): Behavior[EntityCommand] = {
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Received Replica: index=[{}], term=[{}], entityId=[{}], eventType=[{}]",
stateName,
command.logEntry.index,
command.logEntry.term.term,
command.logEntry.event.entityId.map(_.raw),
command.logEntry.event.event.getClass.getName,
)
}
// ReplicatedEntityBehavior can receive Replica message when RaftActor demoted to Follower while replicating an event
Ready.behavior(
setup,
Expand All @@ -67,6 +90,15 @@ private[entityreplication] class WaitForReplication[Command, Event, State](
"ReplicationSucceeded received by the Entity should contain a instanceId",
// Entity sends a Replicate command which contains the instanceId
)
if (context.log.isTraceEnabled) {
context.log.trace(
"[{}] Received ReplicationSucceeded: index=[{}], instanceId=[{}], eventType=[{}]",
stateName,
command.logEntryIndex,
command.instanceId.map(_.underlying),
command.event.getClass.getName,
)
}
if (command.instanceId.contains(setup.instanceId)) {
val event = EntityEvent(Option(setup.replicationId.entityId), command.event)
val newState = transformReadyState(state).applyEvent(setup, event.event, command.logEntryIndex)
Expand Down