Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disabling specific raft actor's launching by its own shard id #173

Merged
merged 18 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c323580
feat: add configuration of disabled shard ids list
nyamada-tis Oct 5, 2022
e0ab8d8
feat: exclude disabled shard ids from given ids when raft actor's sta…
nyamada-tis Oct 5, 2022
9bf36da
test: add test of disabling specific shard ids launching when raft ac…
nyamada-tis Oct 5, 2022
79a562c
refactor: change the type of RaftSettings.disabledShards from List to…
nyamada-tis Oct 6, 2022
ad70cc7
feat: drop all messages sent to disabled shards in ReplicationRegion.…
nyamada-tis Oct 7, 2022
1c6bd7a
test: add test of dropping all messages sent to disabled shards
nyamada-tis Oct 7, 2022
ba515b4
feat: RaftActor stop itself when its shard id is defined as disabled
nyamada-tis Oct 7, 2022
9227ea8
test: RaftActor stop itself when its shard id is defined as disabled
nyamada-tis Oct 7, 2022
c79751e
feat: RaftActor skip recovery when its shard id is disabled
nyamada-tis Oct 11, 2022
48d1428
doc: update CHANGELOG.md
nyamada-tis Oct 11, 2022
aae0843
chore: add example of disabled-shards configuration
nyamada-tis Oct 19, 2022
bad6267
test: remove unnecessary code
nyamada-tis Oct 19, 2022
493389d
refactor: add shard id to warn log caused by deliverling message to d…
nyamada-tis Oct 19, 2022
c655fa2
refactor: add a type annotation to value definition of disabledShards
nyamada-tis Oct 19, 2022
9c7c04a
chore: move disable-shards under num-of-shards
nyamada-tis Oct 19, 2022
2622d6b
test: add test of disabledShards to RaftSettingsSpec
nyamada-tis Oct 19, 2022
5f8f982
feat: CommitLogActor stop itself when its shard id is defined as disa…
nyamada-tis Oct 19, 2022
44e7a8a
test: CommitLogActor stop itself when its shard id is defined as disa…
nyamada-tis Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)

### Fixed
- RaftActor might delete committed entries
Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ lerna.akka.entityreplication {
// Changing this value will cause data inconsistency.
number-of-shards = 100

// Shard Ids of raft actors to disable.
// e.g. ["2", "5"]
disabled-shards = []

// Maximum number of entries which AppendEntries contains.
// The too large size will cause message serialization failure.
max-append-entries-size = 16
Expand Down Expand Up @@ -111,6 +115,7 @@ lerna.akka.entityreplication {
// Snapshot synchronization reads events that related to Raft.
query.plugin = ""
}

}

raft.eventsourced {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ private[entityreplication] class ReplicationRegion(
private[this] val regions: Map[MemberIndex, mutable.Set[Member]] =
allMemberIndexes.map(i => i -> mutable.Set.empty[Member]).toMap

private val disabledShards: Set[ShardId] = settings.raftSettings.disabledShards

// TODO 変数名を実態にあったものに変更
private[this] val shardingRouters: Map[MemberIndex, ActorRef] = allMemberIndexes.map { memberIndex =>
def clusterReplicationShardId(message: Any): String = extractNormalizedShardIdInternal(message).raw
Expand Down Expand Up @@ -255,11 +257,19 @@ private[entityreplication] class ReplicationRegion(
def deliverMessage(message: Any): Unit = {
if (extractShardId.isDefinedAt(message)) {
val shardId = extractShardId(message)
shardingRouters.values.foreach(
// Don't forward StartEntity to prevent leaking StartEntityAck
_.tell(ShardRegion.StartEntity(shardId), context.system.deadLetters),
)
handleRoutingCommand(DeliverSomewhere(Command(message)))
if (!disabledShards.contains(shardId)) {
shardingRouters.values.foreach(
// Don't forward StartEntity to prevent leaking StartEntityAck
_.tell(ShardRegion.StartEntity(shardId), context.system.deadLetters),
)
handleRoutingCommand(DeliverSomewhere(Command(message)))
} else if (log.isWarningEnabled) {
log.warning(
s"Following command had sent to disabled shards was dropped: {}(shardId={})",
message.getClass.getName,
shardId,
)
}
} else {
if (log.isWarningEnabled)
log.warning("The message [{}] was dropped because its shard ID could not be extracted", message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
): Behavior[Nothing] = {
Behaviors
.setup[Command] { context =>
val (disableIds, enableIds) = ids.partition(settings.disabledShards.contains)
if (disableIds.nonEmpty) {
context.log.info(
s"Following disabled shard ids are excluded from ids which is scheduled launching: ${disableIds}",
)
}
val startEntityAckAdapter =
context.messageAdapter[ShardRegion.StartEntityAck](ack => ClassicStartEntityAck(ack.entityId))
Behaviors.withTimers { timers =>
Expand All @@ -46,7 +52,7 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
startEntityAckAdapter,
settings.raftActorAutoStartNumberOfActors,
)
starter.behavior(ids, Set.empty)
starter.behavior(enableIds, Set.empty)
}
}.narrow[Nothing]
}
Expand Down
18 changes: 17 additions & 1 deletion src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package lerna.akka.entityreplication.raft

import akka.actor.{ ActorRef, Cancellable, Props, Stash }
import akka.persistence.RuntimePluginConfig
import akka.persistence.{ Recovery, RuntimePluginConfig }
import com.typesafe.config.{ Config, ConfigFactory }
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
import lerna.akka.entityreplication.ReplicationRegion.Msg
Expand Down Expand Up @@ -214,6 +214,22 @@ private[raft] class RaftActor(

override def snapshotPluginConfig: Config = ConfigFactory.empty()

private def isDisabled: Boolean = settings.disabledShards.contains(shardId.raw)

override def recovery: Recovery = {
if (isDisabled) {
Recovery.none
} else {
Recovery()
}
}

override def preStart(): Unit = {
if (isDisabled) {
context.stop(self)
}
}

val numberOfMembers: Int = settings.replicationFactor

@nowarn("msg=Use RaftMemberData.truncateAndAppendEntries instead.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ trait RaftSettings {

def numberOfShards: Int

def disabledShards: Set[String]

def maxAppendEntriesSize: Int

def maxAppendEntriesBatchSize: Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ private[entityreplication] final case class RaftSettingsImpl(
replicationFactor: Int,
quorumSize: Int,
numberOfShards: Int,
disabledShards: Set[String],
maxAppendEntriesSize: Int,
maxAppendEntriesBatchSize: Int,
compactionSnapshotCacheTimeToLive: FiniteDuration,
Expand Down Expand Up @@ -215,6 +216,8 @@ private[entityreplication] object RaftSettingsImpl {
s"snapshot-every ($eventSourcedSnapshotEvery) should be greater than 0.",
)

val disabledShards: Set[String] = config.getStringList("disabled-shards").asScala.toSet

RaftSettingsImpl(
config = config,
electionTimeout = electionTimeout,
Expand All @@ -224,6 +227,7 @@ private[entityreplication] object RaftSettingsImpl {
replicationFactor = replicationFactor,
quorumSize = quorumSize,
numberOfShards = numberOfShards,
disabledShards = disabledShards,
maxAppendEntriesSize = maxAppendEntriesSize,
maxAppendEntriesBatchSize = maxAppendEntriesBatchSize,
compactionSnapshotCacheTimeToLive = compactionSnapshotCacheTimeToLive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import akka.Done
import akka.actor.{ ActorLogging, ActorRef, ActorSystem, Props }
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }
import akka.persistence.{ PersistentActor, RecoveryCompleted, SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer }
import akka.persistence.{
PersistentActor,
Recovery,
RecoveryCompleted,
SaveSnapshotFailure,
SaveSnapshotSuccess,
SnapshotOffer,
}
import akka.util.ByteString
import lerna.akka.entityreplication.{ ClusterReplicationSerializable, ClusterReplicationSettings }
import lerna.akka.entityreplication.model.{ NormalizedShardId, TypeName }
Expand Down Expand Up @@ -307,4 +314,19 @@ private[entityreplication] class CommitLogStoreActor(typeName: TypeName, setting
}
}

private def isDisabled: Boolean = settings.raftSettings.disabledShards.contains(shardId)

override def recovery: Recovery = {
if (isDisabled) {
Recovery.none
} else {
Recovery()
}
}

override def preStart(): Unit = {
if (isDisabled) {
context.stop(self)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package lerna.akka.entityreplication

import akka.Done
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps

import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ Actor, ActorRef, ActorSelection, DiagnosticActorLogging, Props, RootActorPath, Terminated }
Expand Down Expand Up @@ -120,6 +122,7 @@ object ReplicationRegionSpecConfig extends MultiNodeConfig {
lerna.akka.entityreplication.raft.compaction.log-size-threshold = 2
lerna.akka.entityreplication.raft.compaction.preserve-log-size = 1
lerna.akka.entityreplication.raft.compaction.log-size-check-interval = 0.1s
lerna.akka.entityreplication.raft.disabled-shards = ["12"]
"""))
.withValue(
"lerna.akka.entityreplication.raft.multi-raft-roles",
Expand Down Expand Up @@ -334,6 +337,27 @@ class ReplicationRegionSpec extends MultiNodeSpec(ReplicationRegionSpecConfig) w
}
}

"drop all messages sent to disabled shards" in {
val typeName = createSeqTypeName()

runOn(node4, node5, node6) {
clusterReplication = createReplication(typeName)
}
enterBarrier("ReplicationRegion created")

// "disable-id"'s shard id is "12" and the shard id have been defined as disabled id.
val entityId = "disabled-id"
runOn(node4) {
LoggingTestKit
.warn(
"Following command had sent to disabled shards was dropped: lerna.akka.entityreplication.ReplicationRegionSpec$DummyReplicationActor$Cmd(shardId=12)",
).expect {
clusterReplication ! Cmd(entityId)
}(system.toTyped)
}
enterBarrier("Some command sent")
}

"Broadcast" when {

"全 MemberIndex に配信される" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,33 @@ final class ReplicationRegionRaftActorStarterSpec

}

"trigger all actor starts without disabled actor" in {
val shardRegionProbe = TestProbe()

val customSettings = RaftSettings(
ConfigFactory
.parseString("""
|lerna.akka.entityreplication.raft.disabled-shards = ["2"]
|""".stripMargin).withFallback(system.settings.config),
)
val raftActorStarter =
spawnRaftActorStarter(shardRegionProbe.ref, Set("1", "2", "3"), customSettings)

assume(customSettings.raftActorAutoStartNumberOfActors >= 3)

// The starter will stop at the end of this test.
watch(raftActorStarter)

// The starter should trigger all actor starts without disabled shards.
val startedRaftActorIds = (1 to 2).map { _ =>
expectStartEntityAndThenAck(shardRegionProbe)
}.toSet
startedRaftActorIds shouldBe Set("1", "3")

// The starter should stop itself.
expectTerminated(raftActorStarter)
}

"retry all actor starts with no ACK" in {

val shardRegionProbe = TestProbe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft

import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.typed.scaladsl.adapter._
import akka.actor.{ typed, ActorSystem }
import akka.actor.{ typed, Actor, ActorSystem, Props }
import akka.persistence.testkit.scaladsl.PersistenceTestKit
import akka.testkit.{ TestKit, TestProbe }
import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -672,6 +672,42 @@ class RaftActorSpec

}

"stop itself when its shard id is defined as disabled" in {
val shardId = createUniqueShardId()
val raftConfig = ConfigFactory
.parseString(s"""
| lerna.akka.entityreplication.raft.disabled-shards = ["${shardId.raw}"]
|""".stripMargin).withFallback(ConfigFactory.load())
val ref = system.actorOf(
Props(
new RaftActor(
typeName = defaultTypeName,
extractEntityId = {
case msg => (NormalizedEntityId.from("dummy"), msg)
},
replicationActorProps = _ =>
Props(new Actor() {
override def receive: Receive = {
case message => system.deadLetters forward message
}
}),
_region = TestProbe().ref,
shardSnapshotStoreProps = Props(new Actor {
override def receive: Receive = {
case msg => TestProbe().ref forward msg
}
}),
_selfMemberIndex = createUniqueMemberIndex(),
_otherMemberIndexes = Set(),
settings = RaftSettings(raftConfig),
_commitLogStore = TestProbe().ref,
),
),
name = shardId.underlying,
)
xirc marked this conversation as resolved.
Show resolved Hide resolved
watch(ref)
expectTerminated(ref)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ final class RaftSettingsSpec extends TestKit(ActorSystem("RaftSettingsSpec")) wi
settings.replicationFactor shouldBe 3
settings.quorumSize shouldBe 2
settings.numberOfShards shouldBe 100
settings.disabledShards shouldBe Set.empty
settings.maxAppendEntriesSize shouldBe 16
settings.maxAppendEntriesBatchSize shouldBe 10
settings.compactionSnapshotCacheTimeToLive shouldBe 10.seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object CommitLogStoreActorSpec {
| snapshot-store.plugin = ${PersistenceTestKitSnapshotPlugin.PluginId}
| snapshot-every = $snapshotEvery
|}
|lerna.akka.entityreplication.raft.disabled-shards = ["disabled-shard-id"]
|""".stripMargin)

def config: Config = {
Expand Down Expand Up @@ -568,6 +569,11 @@ final class CommitLogStoreActorSpec
snapshotTestKit.expectNothingPersisted(persistenceId)
}

"stop itself when its shard id is defined as disabled" in {
val (commitLogStoreActor, _, _) = spawnCommitLogStoreActor(name = Some("disabled-shard-id"))
watch(commitLogStoreActor)
expectTerminated(commitLogStoreActor)
}
}

}