Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into add-snapshot-synchr…
Browse files Browse the repository at this point in the history
…onization-diagnostic-logs
  • Loading branch information
Taichi Yamakawa committed Nov 8, 2022
2 parents 7d10d68 + 7629647 commit 255b7f5
Show file tree
Hide file tree
Showing 10 changed files with 351 additions and 116 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)
- Change event sourcing log level to debug
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
- ReplicationRegionRaftActorStarter uses its FQCN as its logger name
[PR178](https://github.com/lerna-stack/akka-entity-replication/pull/178)
- Add diagnostic info to logs of sending replication results
[PR#179](https://github.com/lerna-stack/akka-entity-replication/pull/179)

### Fixed
- RaftActor might delete committed entries
Expand All @@ -44,6 +48,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snapshot synchronization could remove committed log entries that not be included in snapshots
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)
- SnapshotStore doesn't reply with SnapshotNotFound sometimes
[#182](https://github.com/lerna-stack/akka-entity-replication/issues/182),
[#PR183](https://github.com/lerna-stack/akka-entity-replication/pull/183)

## [v2.1.0] - 2022-03-24
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0
Expand Down
51 changes: 51 additions & 0 deletions docs/typed/implementation_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,57 @@ Consistency is ensured when it processes operations that can effect outside the
The entity will output results base on the consistent up-to-date state even if under the network partitioning.
The commands will be fail on one side of the partitioned network to keep consistency.

### Detecting data inconsistencies by Entity Implementation

While akka-entity-replication 2.2.0 or above closes some data inconsistency issues,
detecting such inconsistency issues by entity implementation is preferred.
An entity can use the following techniques to detect data inconsistencies:

* To detect an event duplication and miss, use an event number. As the state of the entity, the entity holds the event
number (called LastAppliedEventNumber) of the last event the entity applied itself. Furthermore, the entity puts the
event number (specifically, LastAppliedEventNumber plus one) to an event. The event handler of the entity verifies
that the event has the expected event number (specifically, the event number must be equal to LastAppliedEventNumber
plus one). If this verification fails, either an event duplication or miss has happened.
* To detect an event misdelivery, put the entity ID to an event. The event handler of the entity verifies that the event
has the same entity ID as its own. If this verification fails, an event misdelivery has happened.

The following example illustrates how an entity detects data inconsistencies:

```scala
import lerna.akka.entityreplication.typed._

object MyReplicatedEntity {
final case class Command()
final case class Event(entityId: String, eventNo: Long)
final case class State(lastAppliedEventNo: Long)

def apply(entityContext: ReplicatedEntityContext[Command]): Behavior[Command] =
ReplicatedEntityBehavior[Command, Event, State](
entityContext,
emptyState = State(lastAppliedEventNo = 0),
commandHandler = (state, command) => {
if (??? /* the command is not processed yet */) {
// Replicate an event as below:
// - To detect an event duplication and miss, put the event number (`state.lastAppliedEventNo + 1`) to the event.
// - To detect an event misdelivery, put the entity ID (`entityContext.entityId`) to the event.
Effect.replicate(Event(entityContext.entityId, state.lastAppliedEventNo + 1))
} else {
// Replicate nothing
???
}
},
eventHandler = (state, event) => {
// To detect an event duplication and miss, verifies the event has the expected event number:
require(event.eventNo == state.lastAppliedEventNo + 1)
// To detect an event misdelivery, verifies the event has the expected entity ID:
require(event.entityId == entityContext.entityId)
// The next state must set the event number of the event to LastAppliedEventNo:
State(event.eventNo)
}
)
}
```

### Passivation

You can stop entities that are not used to reduce memory consumption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
): Behavior[Nothing] = {
Behaviors
.setup[Command] { context =>
context.setLoggerName(this.getClass)
val (disableIds, enableIds) = ids.partition(settings.disabledShards.contains)
if (disableIds.nonEmpty) {
context.log.info(
Expand Down
35 changes: 29 additions & 6 deletions src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,15 @@ private[raft] class RaftActor(
.handleCommittedLogEntriesAndClients { entries =>
entries.foreach {
case (logEntry, Some(client)) =>
if (log.isDebugEnabled)
log.debug("=== [Leader] committed {} and will notify it to {} ===", logEntry, client)
if (log.isDebugEnabled) {
log.debug(
s"=== [${currentState}] Committed event" +
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
s" entityId=[${logEntry.event.entityId.map(_.raw)}]," +
s" type=[${logEntry.event.event.getClass.getName}])" +
s" and sending ReplicationSucceeded to [$client] ===",
)
}
client.forward(ReplicationSucceeded(logEntry.event.event, logEntry.index, client.instanceId))
case (logEntry, None) =>
// 復旧中の commit or リーダー昇格時に未コミットのログがあった場合の commit
Expand Down Expand Up @@ -494,11 +501,27 @@ private[raft] class RaftActor(
logEntry.event match {
case EntityEvent(_, NoOp) => // NoOp は replicationActor には関係ないので転送しない
case EntityEvent(Some(entityId), event) =>
if (log.isDebugEnabled) log.debug("=== [{}] applying {} to ReplicationActor ===", currentState, event)
replicationActor(entityId) ! Replica(logEntry)
val targetReplicationActor = replicationActor(entityId)
if (log.isDebugEnabled) {
log.debug(
s"=== [${currentState}] Sending Replica to apply event" +
s" (index=[${logEntry.index}], term=[${logEntry.term.term}]," +
s" entityId=[${entityId.raw}]," +
s" eventType=[${event.getClass.getName}])" +
s" to ReplicationActor [$targetReplicationActor] ===",
)
}
targetReplicationActor ! Replica(logEntry)
case EntityEvent(None, event) =>
if (log.isWarningEnabled)
log.warning("=== [{}] {} was not applied, because it is not assigned any entity ===", currentState, event)
if (log.isWarningEnabled) {
log.warning(
"[{}] event [index=[{}], term=[{}], type={}] was not applied, because it is not assigned any entity",
currentState,
logEntry.index,
logEntry.term.term,
event.getClass.getName,
)
}
}

def handleSnapshotTick(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ private[entityreplication] object SnapshotStore {
selfMemberIndex: MemberIndex,
): Props =
Props(new SnapshotStore(typeName, entityId, settings, selfMemberIndex))

/** Returns a persistence ID of SnapshotStore */
def persistenceId(typeName: TypeName, entityId: NormalizedEntityId, selfMemberIndex: MemberIndex): String =
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)

}

private[entityreplication] class SnapshotStore(
Expand All @@ -29,7 +34,7 @@ private[entityreplication] class SnapshotStore(
import SnapshotProtocol._

override def persistenceId: String =
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)
SnapshotStore.persistenceId(typeName, entityId, selfMemberIndex)

override def journalPluginId: String = settings.journalPluginId

Expand Down Expand Up @@ -91,8 +96,11 @@ private[entityreplication] class SnapshotStore(
cmd.entityId,
)
case FetchSnapshot(_, replyTo) =>
prevSnapshot.foreach { s =>
replyTo ! SnapshotProtocol.SnapshotFound(s)
prevSnapshot match {
case Some(prevSnapshot) =>
replyTo ! SnapshotProtocol.SnapshotFound(prevSnapshot)
case None =>
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
}
}
case _: persistence.SaveSnapshotSuccess =>
Expand Down
Loading

0 comments on commit 255b7f5

Please sign in to comment.