Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistency after InstallSnapshot #128

Merged
merged 32 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e6c8014
Add a multi-jvm test: MultiSnapshotSyncSpec
Feb 14, 2022
bf44ff1
✨feat: Add SnapshotCopied event
negokaz Feb 14, 2022
a34050f
✨feat: Collects SnapshotCopied for snapshot synchronization
negokaz Feb 14, 2022
5b85c79
✨feat: Record progress of snapshot synchronization with SnapshotCopied
negokaz Feb 14, 2022
5d73cc9
🚨test: Add receivePersisted to RaftEventJournalTestKit
negokaz Feb 15, 2022
8bb610d
🚨test: Add expectNothingPersisted to RaftEventJournalTestKit
negokaz Feb 15, 2022
810f557
🚨test: Add createUniqueEntityId
negokaz Feb 15, 2022
7ee04d4
🚨test: Arrange existing test cases to use SnapshotCopied
negokaz Feb 15, 2022
28ad682
🚨test: SnapshotSyncManager should persist SnapshotCopied event every …
negokaz Feb 15, 2022
33ffa53
🔨refactor: Rename CompactionCompletedTag to EntitySnapshotsUpdatedTag
negokaz Feb 15, 2022
cf609b8
📚doc: RaftActor and SnapshotSyncManager have to use the same journal …
negokaz Feb 15, 2022
68d5a45
👷chore: Add akka-stream-testkit dependency in test scope
negokaz Feb 15, 2022
fc213da
🚨test: Persisted SnapshotCopied is tagged
negokaz Feb 15, 2022
abf0209
🐞fix: Apply journalPluginAdditionalConfig to SnapshotSyncManager to e…
negokaz Feb 15, 2022
c658270
🔨refactor: Check dstLatestSnapshotLastLogTerm and dstLatestSnapshotLa…
negokaz Feb 15, 2022
9fd0bdf
🔨refactor: Rename CompactionEnvelope to EntitySnapshotsUpdated
negokaz Feb 15, 2022
0428608
🚀perf: Merge into single EntitySnapshotsUpdated before copying snapshots
negokaz Feb 15, 2022
6dd08ca
🔀Merge remote-tracking branch 'origin/multi-snapshot-sync-spec'
negokaz Feb 15, 2022
7b6e8c7
👷chore: Add mima-filter
negokaz Feb 16, 2022
c1ed3d3
📚doc: Update CHANGELOG
negokaz Feb 16, 2022
e17e416
Fix election-timeout override in MultiSnapshotSyncSpec
Feb 17, 2022
71bf73a
✨feat: Validate 'snapshot-sync.max-snapshot-batch-size'
negokaz Feb 18, 2022
e5762ce
MultiSnapshotSyncSpec verifies compaction completed
Feb 18, 2022
244f3d3
✨feat: Verify ordering of EntitySnapshotsUpdated
negokaz Feb 18, 2022
ba4a77c
🔨refactor: Use mapConcat instead of flatMapConcat for flattening Option
negokaz Feb 18, 2022
ae3a93b
🚨test: Improve RaftEventJournalTestKit
negokaz Feb 18, 2022
ce195f4
🚨test: Set the appropriate snapshotLastLogIndex
negokaz Feb 18, 2022
90c9627
🚨test: Remove '()' from EventStore.persistenceId
negokaz Feb 18, 2022
75e026e
🔀Merge remote-tracking branch 'origin/multi-snapshot-sync-spec' into …
negokaz Feb 18, 2022
385694b
🚨test: Improve test stability
negokaz Feb 24, 2022
86a13dc
🔀Merge branch 'master' into fix-inconsistency-after-installsnapshot
negokaz Feb 28, 2022
c000054
🔀Merge branch 'master' into fix-inconsistency-after-installsnapshot
negokaz Mar 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Starting a follower member later than leader completes a compaction may break ReplicatedLog of the follower [#105](https://github.com/lerna-stack/akka-entity-replication/issues/105)
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
- Raft Actors doesn't accept a `RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)` message [#125](https://github.com/lerna-stack/akka-entity-replication/issues/125)
- `InstallSnapshot` can miss snapshots to copy [PR#128](https://github.com/lerna-stack/akka-entity-replication/pull/128)

⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.

## [v2.0.0] - 2021-07-16
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ lazy val lerna = (project in file("."))
// TODO 2.6.x 系に対応できる方法に変更する。
"com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2" % Test,
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
),
inConfig(MultiJvm)(
// multi-jvm ディレクトリをフォーマットするために必要
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# It is safe to exclude the following since CompactionCompletedTag (that is renamed to EntitySnapshotsUpdatedTag) is package-private.
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag")
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag$")
# It is safe to exclude the following since SnapshotSyncManager is package-private.
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope")
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope$")
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll")
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll$")
8 changes: 8 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ message TimeBasedUUID {
required int64 least_sig_bits = 2;
}

message SnapshotCopied {
required Offset offset = 1;
required MemberIndex member_index = 2;
required NormalizedShardId shard_id = 3;
required Term snapshot_last_log_term = 4;
required LogEntryIndex snapshot_last_log_index = 5;
repeated NormalizedEntityId entity_ids = 6;
}

// ===
// model
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ lerna.akka.entityreplication {

// Time to abort operations related to persistence
persistence-operation-timeout = 10s

// Maximum size of a snapshot batch copied from leader's snapshot store to local snapshot store
// Note:
// If the event that updated the snapshots contains more than this batch size of entityId,
// only the snapshots the single event indicates will be copied over this limit.
// Copying snapshot should be executed atomically per event.
max-snapshot-batch-size = 1000
}

sharding = ${akka.cluster.sharding} {
Expand Down Expand Up @@ -92,6 +99,7 @@ lerna.akka.entityreplication {
}
event-adapter-bindings {
"lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted" = akka-entity-replication-raft-event-adapter
"lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SnapshotCopied" = akka-entity-replication-raft-event-adapter
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// raft.snapshot
private val EntitySnapshotManifest = "DA"
// raft.snapshot.sync
private val SyncCompletedManifest = "EA"
private val SyncProgressManifest = "EB"
private val NoOffsetManifest = "EC"
private val SequenceManifest = "ED"
private val TimeBasedUUIDManifest = "EE"
private val SyncCompletedManifest = "EA"
private val SyncProgressManifest = "EB"
private val NoOffsetManifest = "EC"
private val SequenceManifest = "ED"
private val TimeBasedUUIDManifest = "EE"
private val SnapshotCopiedManifest = "EF"
// raft.model
private val NoOpManifest = "FA"
// typed
Expand Down Expand Up @@ -87,11 +88,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// raft.snapshot
EntitySnapshotManifest -> entitySnapshotFromBinary,
// raft.snapshot.sync
SyncCompletedManifest -> syncCompletedFromBinary,
SyncProgressManifest -> syncProgressFromBinary,
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
SequenceManifest -> sequenceEnvelopeFromBinary,
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
SyncCompletedManifest -> syncCompletedFromBinary,
SyncProgressManifest -> syncProgressFromBinary,
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
SequenceManifest -> sequenceEnvelopeFromBinary,
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
SnapshotCopiedManifest -> snapshotCopiedFromBinary,
// raft.model
NoOpManifest -> noOpFromBinary,
// typed
Expand Down Expand Up @@ -156,11 +158,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// raft.snapsnot
case _: raft.snapshot.SnapshotProtocol.EntitySnapshot => EntitySnapshotManifest
// raft.snapshot.sync
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
case _: NoOffsetEnvelope.type => NoOffsetManifest
case _: SequenceEnvelope => SequenceManifest
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
case _: NoOffsetEnvelope.type => NoOffsetManifest
case _: SequenceEnvelope => SequenceManifest
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
case _: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => SnapshotCopiedManifest
// raft.model
case _: raft.model.NoOp.type => NoOpManifest
// typed
Expand Down Expand Up @@ -197,11 +200,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
// raft.snapshot
case m: raft.snapshot.SnapshotProtocol.EntitySnapshot => entitySnapShotToBinary(m)
// raft.snapshot.sync
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
case m: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => snapshotCopiedToBinary(m)
// raft.model
case m: raft.model.NoOp.type => noOpToBinary(m)
// typed
Expand Down Expand Up @@ -790,6 +794,30 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
)
}

private def snapshotCopiedToBinary(message: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied): Array[Byte] = {
msg.SnapshotCopied
.of(
offset = offsetToProto(message.offset),
memberIndex = memberIndexToProto(message.memberIndex),
shardId = normalizedShardIdToProto(message.shardId),
snapshotLastLogTerm = termToProto(message.snapshotLastLogTerm),
snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
entityIds = message.entityIds.map(normalizedEntityIdToProto).toSeq,
).toByteArray
}

private def snapshotCopiedFromBinary(bytes: Array[Byte]): raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied = {
val proto = msg.SnapshotCopied.parseFrom(bytes)
raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied(
offset = offsetFromProto(proto.offset),
memberIndex = memberIndexFromProto(proto.memberIndex),
shardId = normalizedShardIdFromProto(proto.shardId),
snapshotLastLogTerm = termFromProto(proto.snapshotLastLogTerm),
snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
entityIds = proto.entityIds.map(normalizedEntityIdFromProto).toSet,
)
}

// ===
// model
// ===
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ private[raft] class RaftActor(
override val persistenceId: String =
ActorIds.persistenceId("raft", typeName.underlying, shardId.underlying, selfMemberIndex.role)

/**
* NOTE:
* [[RaftActor]] has to use the same journal plugin as [[SnapshotSyncManager]]
* because snapshot synchronization is achieved by reading both the events
* [[CompactionCompleted]] which [[RaftActor]] persisted and SnapshotCopied which [[SnapshotSyncManager]] persisted.
*/
override def journalPluginId: String = settings.journalPluginId

override def journalPluginConfig: Config = settings.journalPluginAdditionalConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ trait RaftSettings {

def snapshotSyncPersistenceOperationTimeout: FiniteDuration

def snapshotSyncMaxSnapshotBatchSize: Int

def clusterShardingConfig: Config

def raftActorAutoStartFrequency: FiniteDuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ private[entityreplication] final case class RaftSettingsImpl(
compactionLogSizeCheckInterval: FiniteDuration,
snapshotSyncCopyingParallelism: Int,
snapshotSyncPersistenceOperationTimeout: FiniteDuration,
snapshotSyncMaxSnapshotBatchSize: Int,
clusterShardingConfig: Config,
raftActorAutoStartFrequency: FiniteDuration,
raftActorAutoStartNumberOfActors: Int,
Expand Down Expand Up @@ -140,6 +141,13 @@ private[entityreplication] object RaftSettingsImpl {
val snapshotSyncPersistenceOperationTimeout: FiniteDuration =
config.getDuration("snapshot-sync.persistence-operation-timeout").toScala

val snapshotSyncMaxSnapshotBatchSize: Int =
config.getInt("snapshot-sync.max-snapshot-batch-size")
require(
snapshotSyncMaxSnapshotBatchSize > 0,
s"snapshot-sync.max-snapshot-batch-size (${snapshotSyncMaxSnapshotBatchSize}) should be larger than 0",
)

xirc marked this conversation as resolved.
Show resolved Hide resolved
val clusterShardingConfig: Config = config.getConfig("sharding")

val raftActorAutoStartFrequency: FiniteDuration =
Expand Down Expand Up @@ -196,6 +204,7 @@ private[entityreplication] object RaftSettingsImpl {
compactionLogSizeCheckInterval = compactionLogSizeCheckInterval,
snapshotSyncCopyingParallelism = snapshotSyncCopyingParallelism,
snapshotSyncPersistenceOperationTimeout = snapshotSyncPersistenceOperationTimeout,
snapshotSyncMaxSnapshotBatchSize = snapshotSyncMaxSnapshotBatchSize,
clusterShardingConfig = clusterShardingConfig,
raftActorAutoStartFrequency = raftActorAutoStartFrequency,
raftActorAutoStartNumberOfActors = raftActorAutoStartNumberOfActors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package lerna.akka.entityreplication.raft.persistence
import lerna.akka.entityreplication.model.NormalizedShardId
import lerna.akka.entityreplication.raft.routing.MemberIndex

private[entityreplication] final case class CompactionCompletedTag(
private[entityreplication] final case class EntitySnapshotsUpdatedTag(
memberIndex: MemberIndex,
shardId: NormalizedShardId,
) {
private[this] val delimiter = ":"

// Do not change this tag format for compatibility
override def toString: String = s"CompactionCompleted${delimiter}${shardId.underlying}${delimiter}${memberIndex.role}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lerna.akka.entityreplication.raft.persistence

import akka.persistence.journal.{ EventAdapter, EventSeq, Tagged }
import lerna.akka.entityreplication.raft.RaftActor.CompactionCompleted
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied

private[entityreplication] class RaftEventAdapter extends EventAdapter {

Expand All @@ -12,7 +13,9 @@ private[entityreplication] class RaftEventAdapter extends EventAdapter {
override def toJournal(event: Any): Any = {
event match {
case event: CompactionCompleted =>
Tagged(event, Set(CompactionCompletedTag(event.memberIndex, event.shardId).toString))
Tagged(event, Set(EntitySnapshotsUpdatedTag(event.memberIndex, event.shardId).toString))
case event: SnapshotCopied =>
Tagged(event, Set(EntitySnapshotsUpdatedTag(event.memberIndex, event.shardId).toString))
case event => event
}
}
Expand Down
Loading