Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Add typed API #79

Merged
merged 126 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
f115d05
✨feat: Add number-of-shard setting
negokaz Apr 9, 2021
d8de2f9
🚨test: validating number-of-shards setting
negokaz Apr 9, 2021
66df572
👷chore: Notice that number-of-shards is used by only typed APIs
negokaz Apr 9, 2021
d0bb3f9
Merge pull request #56 from lerna-stack/typed/number-of-shards-setting
negokaz Apr 9, 2021
26d171e
✨feat: Add ReplicationId API
negokaz Apr 8, 2021
504255e
👷chore: Add an akka-cluster-typed dependency
negokaz Apr 9, 2021
1346c4e
✨feat: Add Effect API
negokaz Apr 9, 2021
979d571
✨feat: Add ReplicatedEntityBehavior API
negokaz Apr 8, 2021
c6b6a8e
🚨test: Create BankAccountBehavior sample
negokaz Apr 9, 2021
ed6de79
Merge pull request #54 from lerna-stack/typed/replicated-entity-behav…
negokaz Apr 9, 2021
d9c852e
✨feat: Add ReplicationEnvelope
negokaz Apr 9, 2021
838415c
✨feat: Add ReplicatedEntityTypeKey
negokaz Apr 9, 2021
d645736
✨feat: Add ReplicatedEntity
negokaz Apr 9, 2021
8c39d7e
🔨refactor: Passivate extends ShardCommand (typed API)
negokaz Apr 9, 2021
7501c82
Merge pull request #57 from lerna-stack/typed/replicated-entity-api
tksugimoto Apr 9, 2021
96dee08
✨feat: Add ReplicatedEntityBehaviorTestKit
negokaz Apr 9, 2021
b2e5c5d
Merge pull request #58 from lerna-stack/typed/replicated-entity-behav…
tksugimoto Apr 9, 2021
b446698
✨feat: Add ReplicatedEntityRef API
negokaz Apr 12, 2021
cf0b3cd
📚doc: Fix example code
negokaz Apr 12, 2021
c469c7c
Merge pull request #59 from lerna-stack/typed/replicated-entity-ref-api
tksugimoto Apr 12, 2021
cba142a
✨feat: Add typed 'askTo' API
negokaz Apr 12, 2021
76e00b2
✨feat: Add ClusterReplicaiton.entityRefFor
negokaz Apr 12, 2021
62afb79
🐞fix: Annotate typeKey with ReplicatedEntityTypeKey instead of Replic…
negokaz Apr 12, 2021
e73f8cb
Merge pull request #61 from lerna-stack/typed/at-least-once-complete-api
tksugimoto Apr 12, 2021
2a5a503
Merge branch 'feature/typed-api' into typed/cluster-replication-api
tksugimoto Apr 12, 2021
0f05d33
Merge pull request #60 from lerna-stack/typed/cluster-replication-api
tksugimoto Apr 12, 2021
e02731d
🔨refactor: Move case classes in ReplicationActor to RaftProtocol
negokaz Apr 13, 2021
1277ec6
🔨refactor: Annotate case classes with RaftActorCommand/EntityCommand
negokaz Apr 13, 2021
c37544e
🔨refactor: Decorate case classes in RaftProcotol with 'final'
negokaz Apr 13, 2021
5359d67
🔨refactor: Add ProcessCommand instead of Command to pass command to t…
negokaz Apr 13, 2021
f7aea39
🔨refactor: Arrange definition of commands in RaftProtocol
negokaz Apr 13, 2021
0406fee
👷chore: Add mima excludes filter
negokaz Apr 13, 2021
11c7142
Merge pull request #62 from lerna-stack/typed/refactor-entity-commands
negokaz Apr 13, 2021
6764831
🔨refactor: Remove string properties from ReplicationId
negokaz Apr 15, 2021
f306221
🔨refactor: ReplicatedEntityBehavior does not extend Behavior
negokaz Apr 15, 2021
9e85da9
🔨refactor: RaftActorCommand extends ShardCommand
negokaz Apr 14, 2021
bd56ff2
✨feat: Use DeferredBehavior for ReplicatedEntityBehavior
negokaz Apr 15, 2021
b36448b
🔨refactor: Make ReplicationId an internal API
negokaz Apr 15, 2021
515b3ec
Merge pull request #63 from lerna-stack/typed/fix-replicated-entity-b…
tksugimoto Apr 15, 2021
519ef98
✨feat: Implement Effects
negokaz Apr 13, 2021
74676a8
✨feat: Implement ReplicationId
negokaz Apr 13, 2021
53cdc28
🔨refactor: Arrange import statements in Effect
negokaz Apr 14, 2021
bd23ac9
✨feat: Implement ReplicatedEntityBehavior
negokaz Apr 13, 2021
ae930c1
👷chore: Add a dependency for akka-actor-testkit-typed
negokaz Apr 15, 2021
5cd1d7b
✨feat: Implement ReplicatedEntityTypeKey
negokaz Apr 16, 2021
a6d77e4
🚨test: Set root logger
negokaz Apr 16, 2021
261cd1c
🚨test: Add application-test.conf
negokaz Apr 16, 2021
0a6ee1a
✨feat: Add stopLocally effect
negokaz Apr 16, 2021
2ae7636
✨feat: Implement the stopLocally effect in ReplicatedEntityBehavior
negokaz Apr 16, 2021
d9a6eae
✨feat: Add withStopMessage API
negokaz Apr 16, 2021
f66a5cd
🚨test: Add ReplicatedEntityBehavior spec
negokaz Apr 16, 2021
3df05a4
✨feat: unstashAll when the behavior become Ready
negokaz Apr 19, 2021
d6aa106
🔨refactor: Use EntityInstanceId instead of InstanceId
negokaz Apr 19, 2021
5a19910
✨feat: Ignore ReplicationSucceeded if its instanceId is not same with…
negokaz Apr 19, 2021
e96902a
🔨refactor: Add comments
negokaz Apr 19, 2021
b515fab
✨feat: Output a log when entity recovery is timed out
negokaz Apr 19, 2021
084ee6d
✨feat: Ignore LogEntry that has older LogEntryIndex than already appl…
negokaz Apr 19, 2021
12704de
✨feat: StopLocally effect does not require thenNoReply effect
negokaz Apr 19, 2021
51b23c8
✨feat: Add EntityReplication specific properties
negokaz Apr 21, 2021
28c1e02
Merge pull request #70 from lerna-stack/typed/fix-replicated-entity-b…
tksugimoto Apr 21, 2021
73301b7
✨feat: Make it possible to send messages to any RecepientRefs, not ju…
negokaz Apr 30, 2021
82538b7
✨feat: Add an API AtLeastOnceComplete.askWithStatusTo to use StatusReply
negokaz Apr 30, 2021
e07ed3a
📚doc: Add API doc for 'askTo' and 'askWithStatusTo'
negokaz Apr 30, 2021
5efcedb
Merge pull request #71 from lerna-stack/typed/improve-at-least-once-c…
negokaz Apr 30, 2021
8070a4f
🔨refactor: Compare instanceIds as EntityInstanceId
negokaz May 6, 2021
04bdb0c
🔨refactor: Set instanceId of state to Replicate command directly
negokaz May 6, 2021
f3e0420
🔨refactor: Add the condition of ReplicationSucceeded that the Entity …
negokaz May 6, 2021
224692c
🚨test: Check that instanceId of Replicate command is not empty
negokaz May 6, 2021
5841ca1
🔨refactor: Remove meaningless comment
negokaz May 6, 2021
b3de1ab
✨feat: ReplicatedEntityBehavior handles signals
negokaz May 6, 2021
2c0cd13
🚨test: Check receiveSignal works
negokaz May 6, 2021
90b197b
🔀Merge branch 'master' into merge-master/2021-05-06
negokaz May 6, 2021
2440007
Merge pull request #74 from lerna-stack/typed/merge-master/2021-05-06
tksugimoto May 6, 2021
372a42f
👷chore: Remove unnecessary mima filter
negokaz May 7, 2021
f211556
Merge pull request #75 from lerna-stack/remove-unnecessary-mima-filter
tksugimoto May 7, 2021
e064708
✨feat: Apply unhandled side effect
negokaz May 7, 2021
1342fb4
🚨test: Unhandled command is published to event stream
negokaz May 7, 2021
424fb92
✨feat: Use NoReplyEffect
negokaz May 7, 2021
544d316
🐞fix: Don't drop the behavior that is produced by unstashAll
negokaz May 7, 2021
ad1e552
🔨refactor: Use Behaviors.withStash instead of StashBuffer.apply (inte…
negokaz May 7, 2021
fcfeffc
🐞fix: Don't handle signals that are not handled by user defined signa…
negokaz May 7, 2021
51db514
Merge branch 'feature/typed-api' into typed/replicated-entity-behavio…
tksugimoto May 7, 2021
af6b023
🔨refactor: Make BehaviorSetup a plain class
negokaz May 7, 2021
6d3b8b5
🔨refactor: Move instanceId and stashBuffer to BehaviorSetup
negokaz May 7, 2021
fbbd1d1
Merge pull request #64 from lerna-stack/typed/replicated-entity-behav…
tksugimoto May 7, 2021
148960f
✨feat: Implment ReplicatedEntity
negokaz Apr 19, 2021
32c417a
Merge pull request #65 from lerna-stack/typed/replicated-entity
tksugimoto May 7, 2021
80b64ec
👷chore: Make the akka-actor-testkit-typed dependency to Optional
negokaz Apr 20, 2021
af8fb63
✨feat: Impelemnt ReplicatedEntityBehaviorTestkit
negokaz Apr 20, 2021
44de761
🚨test: Add a spec for ReplicatedEntityBehaviorTestKit
negokaz Apr 27, 2021
99dbdf7
✨feat: Implement ReplicatedEntityRef
negokaz Apr 20, 2021
21b86e8
🚨test: Add ReplicatedEntityRefSpec
negokaz Apr 26, 2021
b34f3d2
🔨refactor: Don't cast to get InternalActorRef
negokaz May 10, 2021
b23093f
📚doc: Add a note about recommended Akka version
negokaz May 10, 2021
d375219
Merge pull request #67 from lerna-stack/typed/replicated-entity-ref-impl
tksugimoto May 10, 2021
7c78638
✨feat: Implement typed AtLeastOnceComplete.askTo
negokaz Apr 20, 2021
049db40
🚨test: Add test cases for AtLeastOnceComplete (typed API)
negokaz Apr 30, 2021
c273f3a
🔨refactor: Remove a unused type parameter
negokaz May 10, 2021
0045566
✨feat: Change log level from debug to warn while retrying
negokaz-tis May 11, 2021
c6a4576
🔨refactor: Create LoggigAdapter in each askTo method
negokaz-tis May 11, 2021
c28823c
📚doc: Fix explains about the result when timeout occurs
negokaz-tis May 11, 2021
23ce462
🔨refactor: Remove a unused import
negokaz-tis May 11, 2021
eebb459
Merge pull request #68 from lerna-stack/typed/at-least-once-complete-…
tksugimoto May 11, 2021
076ee8d
Merge branch 'feature/typed-api' into typed/replicated-entity-behavio…
tksugimoto May 11, 2021
482dc11
🚨test: Add assertion to check CommandResult.state
negokaz-tis May 13, 2021
c3655c3
Merge pull request #69 from lerna-stack/typed/replicated-entity-behav…
tksugimoto May 13, 2021
e96648f
✨feat: Add internal API to start ClusterReplication with EntityPropsP…
negokaz Apr 19, 2021
8b5b5ed
✨feat: Implement ClusterReplication.init
negokaz Apr 19, 2021
7ae5f57
✨feat: Implement ClusterReplication.entityRefFor
negokaz Apr 30, 2021
dc355ca
🚨test: Add a spec for ClusterReplication
negokaz Apr 30, 2021
f3dfab9
🔨refactor: Store typed ActorRef to regions Map
negokaz-tis May 13, 2021
54bff32
✨feat: Implement a serialization for ReplicationEnvelope
negokaz-tis May 13, 2021
0a5990a
🐞fix: Make classic ClusterReplication an ActorSystem extension
negokaz-tis May 13, 2021
9d5864f
🔨refactor: Change 'NormalizedEntityId.decoded: String' to '.raw: Enti…
negokaz May 21, 2021
a65a8e6
🐞fix: Prevent the issue of extra shards might be created
negokaz May 21, 2021
220e6d9
Merge pull request #66 from lerna-stack/typed/cluster-replication-impl
tksugimoto May 27, 2021
451a265
🚨test: Move common config to multi-jvm-testing.conf
negokaz-tis May 13, 2021
1836cd7
👷chore: Add a dependency for akka-actor-testkit-typed in Test scope
negokaz-tis May 13, 2021
69431f7
🚨test: Implement ReplicatedEntityMultiNodeSpec
negokaz-tis May 13, 2021
fe23e4f
🚨test: Implement ReplicatedEntitySnapshotMultiNodeSpec
negokaz-tis May 13, 2021
9989e5c
🚨test: Add a test for Effect.stash
negokaz Jun 2, 2021
43baa2a
🚨test: ReplicatedEntityBehavior should stop when illegal any settings…
negokaz-tis Jun 2, 2021
2c8e2fa
🚨test: Use EntityRef as RecipientRef
negokaz-tis Jun 3, 2021
3259319
🚨test: Retry findEntityActorRef() when receptionist doesn't have rece…
negokaz Jun 3, 2021
6c678d6
📚doc: Add a comment: why we don't use Effect.reply
negokaz Jun 8, 2021
cecd43a
Merge pull request #78 from lerna-stack/typed/multi-jvm-test
tksugimoto Jun 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ libraryDependencies += "com.lerna-stack" %% "akka-entity-replication" % "X.X.X-S

These versions of akka-entity-replication depends on **Akka 2.6.x**. It has been published for Scala 2.13.

**NOTE**: akka-entity-replication uses Akka's internal API.
It is recommended that you use the same version of Akka as akka-entity-replication for your application to avoid compatibility issues.
Please see [build.sbt](./build.sbt) for the version of Akka that akka-entity-replication depends on.

For more information on how to implement an application using this library, please refer to the following documents.

- [Implementation Guide](docs/implementation_guide.md)
Expand Down
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ lazy val lerna = (project in file("."))
fork in Test := true,
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
// persistence-query 2.6.x を明示的に指定しないとエラーになる。
// 恐らく akka-persistence-inmemory の影響である。
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"io.altoo" %% "akka-kryo-serialization" % "1.1.5" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.9" % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Optional,
// multi-jvm:test can't resolve [Optional] dependency
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"io.altoo" %% "akka-kryo-serialization" % "1.1.5" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.9" % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
// akka-persistence-inmemory が 2.6.x 系に対応していない。
// TODO 2.6.x 系に対応できる方法に変更する。
"com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2" % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Following classes and objects will be package private in next release
#
# object lerna.akka.entityreplication.ReplicationActor#EntityRecoveryTimeoutException does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$EntityRecoveryTimeoutException$")
# class lerna.akka.entityreplication.ReplicationActor#Snapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$Snapshot")
# object lerna.akka.entityreplication.ReplicationActor#TakeSnapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$TakeSnapshot$")
# class lerna.akka.entityreplication.ReplicationActor#EntityRecoveryTimeoutException does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$EntityRecoveryTimeoutException")
# object lerna.akka.entityreplication.ReplicationActor#RecoveryTimeout does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$RecoveryTimeout$")
# object lerna.akka.entityreplication.ReplicationActor#Snapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$Snapshot$")
# class lerna.akka.entityreplication.ReplicationActor#TakeSnapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$TakeSnapshot")
# class lerna.akka.entityreplication.raft.RaftProtocol#ReplicationSucceeded is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$ReplicationSucceeded")
# class lerna.akka.entityreplication.raft.RaftProtocol#Command is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Command")
# class lerna.akka.entityreplication.raft.RaftProtocol#Replicate is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Replicate")
# class lerna.akka.entityreplication.raft.RaftProtocol#ForwardedCommand is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$ForwardedCommand")
# class lerna.akka.entityreplication.raft.RaftProtocol#Replica is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Replica")
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# [ClusterReplication(system): ClusterReplication] can be used as before.
#
# static method apply(akka.actor.ActorSystem)lerna.akka.entityreplication.ClusterReplication in class lerna.akka.entityreplication.ClusterReplication has a different result type in current version, where it is akka.actor.Extension rather than lerna.akka.entityreplication.ClusterReplication
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.ClusterReplication.apply")
# method this(akka.actor.ActorSystem)Unit in class lerna.akka.entityreplication.ClusterReplication's type is different in current version, where it is (akka.actor.ExtendedActorSystem)Unit instead of (akka.actor.ActorSystem)Unit
ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.akka.entityreplication.ClusterReplication.this")
# method apply(akka.actor.ActorSystem)lerna.akka.entityreplication.ClusterReplication in object lerna.akka.entityreplication.ClusterReplication has a different result type in current version, where it is akka.actor.Extension rather than lerna.akka.entityreplication.ClusterReplication
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.ClusterReplication.apply")
8 changes: 8 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ message MemberIndex {
required string role = 1;
}

// ===
// typed
// ===

message ReplicationEnvelope {
required string entityId = 1;
required Payload message = 2;
}

// ===
// payload
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ lerna.akka.entityreplication {
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]

// Number of shards per single multi-raft-role used by only typed APIs.
// This value must be the same for all nodes in the cluster
// and must not be changed after starting to use.
// Changing this value will cause data inconsistency.
number-of-shards = 100

// Maximum number of entries which AppendEntries contains.
// The too large size will cause message serialization failure.
max-append-entries-size = 16
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/akka/lerna/DeferredBehavior.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.lerna

/**
* Expose internal API of Akka to use it in [[lerna]] package.
*/
abstract class DeferredBehavior[Command] extends akka.actor.typed.internal.BehaviorImpl.DeferredBehavior[Command]
19 changes: 19 additions & 0 deletions src/main/scala/akka/lerna/InternalActorRefProxy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.lerna

import akka.actor.ActorRefProvider
import akka.actor.typed.ActorRef
import akka.actor.typed.internal.adapter.ActorRefAdapter

object InternalActorRefProxy {
def apply[T](ref: ActorRef[T]): InternalActorRefProxy[T] =
new InternalActorRefProxy[T](ref)
}

class InternalActorRefProxy[T](ref: ActorRef[T]) {

private[this] val classicRef = ActorRefAdapter.toClassic(ref)

def provider: ActorRefProvider = classicRef.provider

def isTerminated: Boolean = classicRef.isTerminated
}
6 changes: 6 additions & 0 deletions src/main/scala/akka/lerna/InternalRecipientRef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.lerna

/**
* Expose internal API of Akka to use it in [[lerna]] package.
*/
trait InternalRecipientRef[-T] extends akka.actor.typed.internal.InternalRecipientRef[T]
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package lerna.akka.entityreplication

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.actor.{ ActorRef, ExtendedActorSystem, Extension, ExtensionId, Props }
import lerna.akka.entityreplication.model.TypeName
import lerna.akka.entityreplication.raft.eventsourced.{ CommitLogStore, ShardedCommitLogStore }

object ClusterReplication {
object ClusterReplication extends ExtensionId[ClusterReplication] {

def apply(system: ActorSystem): ClusterReplication = new ClusterReplication(system)
override def createExtension(system: ExtendedActorSystem): ClusterReplication = new ClusterReplication(system)

private val actorNamePrefix: String = "replicationRegion"

private[entityreplication] type EntityPropsProvider = ReplicationActorContext => Props
}

class ClusterReplication private (system: ActorSystem) {
class ClusterReplication private (system: ExtendedActorSystem) extends Extension {

import ClusterReplication._

Expand All @@ -21,6 +23,16 @@ class ClusterReplication private (system: ActorSystem) {
settings: ClusterReplicationSettings,
extractEntityId: ReplicationRegion.ExtractEntityId,
extractShardId: ReplicationRegion.ExtractShardId,
): ActorRef = {
internalStart(typeName, _ => entityProps, settings, extractEntityId, extractShardId)
}

private[entityreplication] def internalStart(
typeName: String,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ReplicationRegion.ExtractEntityId,
extractShardId: ReplicationRegion.ExtractShardId,
): ActorRef = {
val _typeName = TypeName.from(typeName)

Expand All @@ -31,7 +43,7 @@ class ClusterReplication private (system: ActorSystem) {
Option.when(enabled)(new ShardedCommitLogStore(_typeName, system))
}

system.actorOf(
system.systemActorOf(
ReplicationRegion.props(typeName, entityProps, settings, extractEntityId, extractShardId, maybeCommitLogStore),
s"$actorNamePrefix-$typeName",
)
Expand Down
13 changes: 2 additions & 11 deletions src/main/scala/lerna/akka/entityreplication/ReplicationActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{ Actor, ActorPath, ActorRef, Cancellable, Stash }
import akka.actor.{ Actor, Cancellable, Stash }
import akka.event.Logging
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.RaftProtocol._
Expand All @@ -15,18 +15,9 @@ private[entityreplication] object ReplicationActor {
private[this] val instanceIdCounter = new AtomicInteger(1)

private def generateInstanceId(): EntityInstanceId = EntityInstanceId(instanceIdCounter.getAndIncrement())

private[entityreplication] final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef)
private[entityreplication] final case class Snapshot(metadata: EntitySnapshotMetadata, state: EntityState)

private final case object RecoveryTimeout

private[entityreplication] final case class EntityRecoveryTimeoutException(entityPath: ActorPath)
extends RuntimeException
}

trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.StashFactory {
import ReplicationActor._
import context.dispatcher

private val internalStash = createStash()
Expand Down Expand Up @@ -92,7 +83,7 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash

override def stateReceive(receive: Receive, message: Any): Unit =
message match {
case Command(command) =>
case ProcessCommand(command) =>
receive.applyOrElse[Any, Unit](
command,
command => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package lerna.akka.entityreplication

import akka.actor.ActorRef

private[entityreplication] class ReplicationActorContext(val entityId: String, val shard: ActorRef)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.cluster.sharding.ShardRegion.{ GracefulShutdown, HashCodeMessageExtr
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.cluster.{ Cluster, Member, MemberStatus }
import akka.routing.{ ActorRefRoutee, ConsistentHashingRouter, ConsistentHashingRoutingLogic, Router }
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
import lerna.akka.entityreplication.ReplicationRegion.{ ExtractEntityId, ExtractShardId }
import lerna.akka.entityreplication.model._
import lerna.akka.entityreplication.raft.RaftActor
Expand All @@ -14,6 +15,7 @@ import lerna.akka.entityreplication.raft.eventsourced.CommitLogStore
import lerna.akka.entityreplication.raft.protocol.ShardRequest
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.ShardSnapshotStore
import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand

import scala.collection.mutable

Expand Down Expand Up @@ -53,7 +55,7 @@ object ReplicationRegion {

private[entityreplication] def props(
typeName: String,
entityProps: Props,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ExtractEntityId,
extractShardId: ExtractShardId,
Expand All @@ -63,7 +65,7 @@ object ReplicationRegion {

private[entityreplication] case class CreateShard(shardId: NormalizedShardId) extends ShardRequest

final case class Passivate(entityPath: ActorPath, stopMessage: Any)
final case class Passivate(entityPath: ActorPath, stopMessage: Any) extends ShardCommand

private[entityreplication] sealed trait RoutingCommand
private[entityreplication] final case class Broadcast(message: Any) extends RoutingCommand
Expand All @@ -80,7 +82,7 @@ object ReplicationRegion {

private[entityreplication] class ReplicationRegion(
typeName: String,
entityProps: Props,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ExtractEntityId,
extractShardId: ExtractShardId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package lerna.akka.entityreplication.model

import akka.actor.ActorPath
import akka.util.ByteString
import lerna.akka.entityreplication.ReplicationRegion.EntityId

import java.net.URLEncoder
import java.net.{ URLDecoder, URLEncoder }

private[entityreplication] object NormalizedEntityId {
def from(entityId: EntityId): NormalizedEntityId = new NormalizedEntityId(URLEncoder.encode(entityId, "utf-8"))
Expand All @@ -14,4 +15,7 @@ private[entityreplication] object NormalizedEntityId {
new NormalizedEntityId(encodedEntityId)
}

private[entityreplication] final case class NormalizedEntityId private (underlying: String) extends AnyVal
private[entityreplication] final case class NormalizedEntityId private (underlying: String) extends AnyVal {

def raw: EntityId = URLDecoder.decode(underlying, ByteString.UTF_8)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ NoOffset, Offset, Sequence, TimeBasedUUID }
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest, Serializers }
import com.google.protobuf.ByteString
import lerna.akka.entityreplication.{ model, raft, ClusterReplicationSerializable }
import lerna.akka.entityreplication.{ model, raft, typed, ClusterReplicationSerializable }

import java.io.NotSerializableException
import java.util.UUID
Expand Down Expand Up @@ -52,6 +52,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
private val TimeBasedUUIDManifest = "EE"
// raft.model
private val NoOpManifest = "FA"
// typed
private val ReplicationEnvelopeManifest = "GA"

// Manifest -> fromBinary
private val fromBinaryMap = HashMap[String, Array[Byte] => ClusterReplicationSerializable](
Expand Down Expand Up @@ -90,6 +92,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
// raft.model
NoOpManifest -> noOpFromBinary,
// typed
ReplicationEnvelopeManifest -> replicationEnvelopeFromBinary,
)

override def manifest(o: AnyRef): String = {
Expand Down Expand Up @@ -156,6 +160,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
// raft.model
case _: raft.model.NoOp.type => NoOpManifest
// typed
case _: typed.ReplicationEnvelope[_] => ReplicationEnvelopeManifest
}

private val serializableToBinary: PartialFunction[ClusterReplicationSerializable, Array[Byte]] = {
Expand Down Expand Up @@ -194,6 +200,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
// raft.model
case m: raft.model.NoOp.type => noOpToBinary(m)
// typed
case m: typed.ReplicationEnvelope[_] => replicationEnvelopeToBinary(m)
}

// ===
Expand Down Expand Up @@ -907,6 +915,26 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
)
}

// ===
// typed
// ===

private def replicationEnvelopeToBinary(message: typed.ReplicationEnvelope[_]): Array[Byte] = {
msg.ReplicationEnvelope
.of(
entityId = message.entityId,
message = payloadToProto(message.message),
).toByteArray
}

private def replicationEnvelopeFromBinary(bytes: Array[Byte]): typed.ReplicationEnvelope[_] = {
val proto = msg.ReplicationEnvelope.parseFrom(bytes)
typed.ReplicationEnvelope(
proto.entityId,
message = payloadFromProto(proto.message),
)
}

// ===
// payload
// ===
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.ReplicationActor
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ SuspendEntity, TryCreateEntity }
Expand Down Expand Up @@ -42,7 +41,7 @@ private[raft] trait Candidate { this: RaftActor =>
case response: SnapshotProtocol.FetchSnapshotResponse => receiveFetchSnapshotResponse(response)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: ReplicationActor.Snapshot => receiveEntitySnapshotResponse(response)
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
Expand Down
Loading