Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracks progress of event sourcing #137

Merged
merged 21 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9df71dd
Test compaction preserves not-sourced committed events
Mar 14, 2022
26f83b6
CommitLogStoreActor handles new command AppendCommittedEntries
Mar 9, 2022
eb61737
RaftActor tracks the progress of event sourcing
Mar 10, 2022
655e56a
RaftActor skips new compaction if the compaction cannot delete enough…
Mar 14, 2022
22c1d6d
Remove sending raft.eventsourced.Save
Mar 14, 2022
3531757
Revise implementation guides
Mar 14, 2022
b3dfc4c
RaftActor sends multiple batched AppendCommittedEntries
Mar 14, 2022
759968e
Add MiMa excludes
Mar 14, 2022
01754e0
Deprecate raft.eventsourced.Save class
Mar 15, 2022
6fae15a
Require compaction.preserve-log-size is less than compaction.log-size…
Mar 15, 2022
e3afbd0
RaftMemberData.estimatedReplicatedLogSizeAfterCompaction throws Illeg…
Mar 15, 2022
97f2b35
RaftMemberData.compactReplicatedLog throws IllegalArgumentException e…
Mar 15, 2022
e20cc9a
Revise CHANGELOG
Mar 15, 2022
9291753
Revise settings of ReplicationActorMultiNodeSpec
Mar 15, 2022
eddcb67
Merge remote-tracking branch 'origin/master' into event-sourcing-prog…
Mar 15, 2022
efdfccc
Wrap `log.is{Info,Debug}Enabled` around `log.{info,debug}`
Mar 16, 2022
9058d62
Remove `===` prefix from not-debug log messages
Mar 16, 2022
8f01846
Describe which parts are not the command-side settings
Mar 16, 2022
3bb9a43
Fix the wrong ancestorLastTerm calculation
Mar 16, 2022
872e4e4
Add diagnostic infomation (lastApplied,eventSourcingIndex,preserveLog…
Mar 16, 2022
9ee278b
Prioritize the warn log over the info log
Mar 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
This feature is enabled only by using `typed.ClusterReplication`.
It is highly recommended that you switch using the typed API since the classic API was deprecated.

- Raft actors track the progress of the event sourcing [#136](https://github.com/lerna-stack/akka-entity-replication/issues/136).

This feature ensures that
- Event Sourcing won't halt even if the event-sourcing store is unavailable for a long period.
After the event-sourcing store recovers, Event Sourcing will work again automatically.
- Compaction won't delete committed events that are not persisted to the event-sourcing store yet.

It adds new following settings (for more details, please see `reference.conf`):
- `lerna.akka.entityreplication.raft.eventsourced.committed-log-entries-check-interval`
- `lerna.akka.entityreplication.raft.eventsourced.max-append-committed-entries-size`
- `lerna.akka.entityreplication.raft.eventsourced.max-append-committed-entries-batch-size`

It deletes the following settings:
- `lerna.akka.entityreplication.raft.eventsourced.commit-log-store.retry.attempts`
- `lerna.akka.entityreplication.raft.eventsourced.commit-log-store.retry.delay`

It requires that
`lerna.akka.entityreplication.raft.compaction.preserve-log-size` is less than
`lerna.akka.entityreplication.raft.compaction.log-size-threshold`.


### Changed
- Bump up Akka version to 2.6.17 [PR#98](https://github.com/lerna-stack/akka-entity-replication/pull/98)

Expand Down
102 changes: 2 additions & 100 deletions docs/implementation_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,81 +166,8 @@ AtLeastOnceComplete.askTo(

### Configuration

On the command side, there are the following settings.
On the command side, the related settings are defined at `lerna.akka.entityreplication`(except `lerna.akka.entityreplication.raft.eventsourced`) in [reference.conf](/src/main/resources/reference.conf).

```hocon
lerna.akka.entityreplication {

// How long wait before giving up entity recovery.
// Entity recovery requires a snapshot, and failure fetching it will cause this timeout.
// If timed out, entity recovery will be retried.
recovery-entity-timeout = 10s

raft {
// The time it takes to start electing a new leader after the heartbeat is no longer received from the leader.
election-timeout = 750 ms

// The interval between leaders sending heartbeats to their followers
heartbeat-interval = 100 ms

// A role to identify the nodes to place replicas on
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]

// Maximum number of entries which AppendEntries contains.
// The too large size will cause message serialization failure.
max-append-entries-size = 16

// The maximum number of AppendEnteis that will be sent at once at every heartbeat-interval.
max-append-entries-batch-size = 10

// log compaction settings
compaction {

// Time interval to check the size of the log and check if a snapshotting is needed to be taken
log-size-check-interval = 10s

// Threshold for saving snapshots and compaction of the log.
// If this value is too large, your application will use a lot of memory and you may get an OutOfMemoryError.
// If this value is too small, it compaction may occur frequently and overload the application and the data store.
log-size-threshold = 50000

// Preserving log entries from log reduction to avoid log replication failure.
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
preserve-log-size = 10000

// Time to keep a cache of snapshots in memory
snapshot-cache-time-to-live = 10s
}

// snapshot synchronization settings
snapshot-sync {

// Number of snapshots of entities that are copied in parallel
snapshot-copying-parallelism = 10

// Time to abort operations related to persistence
persistence-operation-timeout = 10s
}

// data persistent settings
persistence {
// Absolute path to the journal plugin configuration entry.
// The journal will be stored events which related to Raft.
journal.plugin = ""

// Absolute path to the snapshot store plugin configuration entry.
// The snapshot store will be stored state which related to Raft.
snapshot-store.plugin = ""

// Absolute path to the query plugin configuration entry.
// Snapshot synchronization reads events that related to Raft.
query.plugin = ""
}
}
}
```

## Read Side

Expand Down Expand Up @@ -358,33 +285,8 @@ object EventHandler {

### Configuration

On the read side, there are the following settings.

```hocon
lerna.akka.entityreplication.raft.eventsourced {
// Settings for saving committed events from each RaftActor
commit-log-store {
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
retry {
attempts = 15
delay = 3 seconds
}
}

persistence {
// Absolute path to the journal plugin configuration entry.
// The journal stores Raft-committed events.
journal.plugin = ""

// Absolute path to the snapshot-store plugin configuration entry.
// The snapshot-store stores a state (snapshot) built from Raft-committed events.
snapshot-store.plugin = ""
On the read side, the related settings are defined at `lerna.akka.entityreplication.raft.eventsourced` in [reference.conf](/src/main/resources/reference.conf).

// Snapshot after this number of events.
snapshot-every = 1000
}
}
```

## Persistence plugin configuration

Expand Down
101 changes: 2 additions & 99 deletions docs/typed/implementation_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,87 +360,8 @@ This is useful when you would like to change the datastore that persists events

### Configuration

On the command side, there are the following settings.
On the command side, the related settings are defined at `lerna.akka.entityreplication`(except `lerna.akka.entityreplication.raft.eventsourced`) in [reference.conf](/src/main/resources/reference.conf).

```hocon
lerna.akka.entityreplication {

// How long wait before giving up entity recovery.
// Entity recovery requires a snapshot, and failure fetching it will cause this timeout.
// If timed out, entity recovery will be retried.
recovery-entity-timeout = 10s

raft {
// The time it takes to start electing a new leader after the heartbeat is no longer received from the leader.
election-timeout = 750 ms

// The interval between leaders sending heartbeats to their followers
heartbeat-interval = 100 ms

// A role to identify the nodes to place replicas on
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]

// Number of shards per single multi-raft-role used by only typed APIs.
// This value must be the same for all nodes in the cluster
// and must not be changed after starting to use.
// Changing this value will cause data inconsistency.
number-of-shards = 100

// Maximum number of entries which AppendEntries contains.
// The too large size will cause message serialization failure.
max-append-entries-size = 16

// The maximum number of AppendEnteis that will be sent at once at every heartbeat-interval.
max-append-entries-batch-size = 10

// log compaction settings
compaction {

// Time interval to check the size of the log and check if a snapshotting is needed to be taken
log-size-check-interval = 10s

// Threshold for saving snapshots and compaction of the log.
// If this value is too large, your application will use a lot of memory and you may get an OutOfMemoryError.
// If this value is too small, it compaction may occur frequently and overload the application and the data store.
log-size-threshold = 50000

// Preserving log entries from log reduction to avoid log replication failure.
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
preserve-log-size = 10000

// Time to keep a cache of snapshots in memory
snapshot-cache-time-to-live = 10s
}

// snapshot synchronization settings
snapshot-sync {

// Number of snapshots of entities that are copied in parallel
snapshot-copying-parallelism = 10

// Time to abort operations related to persistence
persistence-operation-timeout = 10s
}

// data persistent settings
persistence {
// Absolute path to the journal plugin configuration entry.
// The journal will be stored events which related to Raft.
journal.plugin = ""

// Absolute path to the snapshot store plugin configuration entry.
// The snapshot store will be stored state which related to Raft.
snapshot-store.plugin = ""

// Absolute path to the query plugin configuration entry.
// Snapshot synchronization reads events that related to Raft.
query.plugin = ""
}
}
}
```

## Read Side

Expand Down Expand Up @@ -563,26 +484,8 @@ You can set an arbitrary value however you cannot change the value easily after

### Configuration

On the read side, there are the following settings.
On the read side, the related settings are defined at `lerna.akka.entityreplication.raft.eventsourced` in [reference.conf](/src/main/resources/reference.conf).

```hocon
lerna.akka.entityreplication.raft.eventsourced {
// Settings for saving committed events from each RaftActor
commit-log-store {
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
retry {
attempts = 15
delay = 3 seconds
}
}

persistence {
// Absolute path to the journal plugin configuration entry.
// The journal stores Raft-committed events.
journal.plugin = ""
}
}
```

## Persistence plugin configuration

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# It is safe to exclude the following since these classes are package private.
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.eventsourced.CommitLogStore")
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.eventsourced.ShardedCommitLogStore")
10 changes: 10 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ message CommitLogStoreActorState {
required LogEntryIndex current_index = 1;
}

message CommitLogStoreAppendCommittedEntries {
required NormalizedShardId shard_id = 1;
repeated LogEntry entries = 2;
}

message CommitLogStoreAppendCommittedEntriesResponse {
required LogEntryIndex current_index = 1;
}


// ===
// raft.protocol
// ===
Expand Down
29 changes: 20 additions & 9 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ lerna.akka.entityreplication {

// Preserving log entries from log reduction to avoid log replication failure.
// If more number of logs than this value cannot be synchronized, the raft member will be unavailable.
// It is recommended to set this value even less than log-size-threshold. Otherwise compaction will be run at every log-size-check-interval.
// This value should be less than `log-size-threshold` and greater than 0. Otherwise, instantiating RaftSettings will fail.
preserve-log-size = 10000

// Time to keep a cache of snapshots in memory
Expand Down Expand Up @@ -114,14 +114,25 @@ lerna.akka.entityreplication {
}

raft.eventsourced {
// Settings for saving committed events from each RaftActor
commit-log-store {
// Retry setting to prevent events from being lost if commit-log-store(sharding) stops temporarily
retry {
attempts = 15
delay = 3 seconds
}
}

// Interval in which Raft Leader checks its committed log entries
//
// When new committed log entries are available, the leader sends these new entries to event-sourcing store(a.k.a. CommitLogStore).
// This interval should be larger enough than network latencies since CommitLogStore might run on another node not running the leader.
// If this interval is smaller than such latencies, the leader sends the same entry multiple times, which causes network resource inefficiency.
committed-log-entries-check-interval = 100ms

// Maximum number of entries AppendCommittedEntries contains.
// The default value is the same as `raft.max-append-entries-size`.
// A too-large value might cause message serialization failure.
max-append-committed-entries-size = ${lerna.akka.entityreplication.raft.max-append-entries-size}

// Maximum number of AppendCommittedEntries to send at once at every `committed-log-entries-check-interval`.
// The default value is the same as `raft.max-append-entries-batch-size`.
// If there are many not-persisted committed entries,
// * A too-large value might cause temporary network overload
// * A too-small value might cause event-sourcing to take more time to catch up on the latest.
max-append-committed-entries-batch-size = ${lerna.akka.entityreplication.raft.max-append-entries-batch-size}

persistence {
// Absolute path to the journal plugin configuration entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lerna.akka.entityreplication
import akka.actor.{ Actor, ActorRef, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, Props, Status }
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
import lerna.akka.entityreplication.model.TypeName
import lerna.akka.entityreplication.raft.eventsourced.{ CommitLogStore, ShardedCommitLogStore }
import lerna.akka.entityreplication.raft.eventsourced.CommitLogStoreActor
import lerna.akka.entityreplication.util.ActorIds
import akka.util.Timeout
import akka.pattern.ask
Expand Down Expand Up @@ -92,12 +92,8 @@ private[entityreplication] class ClusterReplicationGuardian extends Actor {
val _typeName = TypeName.from(typeName)
val regionName = ActorIds.actorName(_typeName.underlying)

val maybeCommitLogStore: Option[CommitLogStore] = {
// TODO: RMUの有効無効をconfigから指定
val enabled = true // FIXME: settings から取得する (typeName ごとに切り替えられる必要あり)
// TODO: テストのために差し替え出来るようにする
Option.when(enabled)(new ShardedCommitLogStore(_typeName, context.system, settings))
}
val commitLogStore: ActorRef =
CommitLogStoreActor.startClusterSharding(_typeName, context.system, settings)

val regionRef: ActorRef =
context.child(regionName) match {
Expand All @@ -111,7 +107,7 @@ private[entityreplication] class ClusterReplicationGuardian extends Actor {
extractEntityId,
extractShardId,
possibleShardIds,
maybeCommitLogStore,
commitLogStore,
),
regionName,
)
Expand Down
Loading