Skip to content

Commit

Permalink
Merge pull request #79 from lerna-stack/feature/typed-api
Browse files Browse the repository at this point in the history
[Feature] Add typed API
  • Loading branch information
tksugimoto authored Jun 9, 2021
2 parents b27ec73 + cecd43a commit 165d1ce
Show file tree
Hide file tree
Showing 70 changed files with 3,762 additions and 83 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ libraryDependencies += "com.lerna-stack" %% "akka-entity-replication" % "X.X.X-S

These versions of akka-entity-replication depends on **Akka 2.6.x**. It has been published for Scala 2.13.

**NOTE**: akka-entity-replication uses Akka's internal API.
It is recommended that you use the same version of Akka as akka-entity-replication for your application to avoid compatibility issues.
Please see [build.sbt](./build.sbt) for the version of Akka that akka-entity-replication depends on.

For more information on how to implement an application using this library, please refer to the following documents.

- [Implementation Guide](docs/implementation_guide.md)
Expand Down
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ lazy val lerna = (project in file("."))
fork in Test := true,
parallelExecution in Test := false,
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster" % akkaVersion,
"com.typesafe.akka" %% "akka-cluster-sharding" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
// persistence-query 2.6.x を明示的に指定しないとエラーになる。
// 恐らく akka-persistence-inmemory の影響である。
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"io.altoo" %% "akka-kryo-serialization" % "1.1.5" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.9" % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Optional,
// multi-jvm:test can't resolve [Optional] dependency
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"io.altoo" %% "akka-kryo-serialization" % "1.1.5" % Test,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3" % Test,
"org.scalatest" %% "scalatest" % "3.0.9" % Test,
"com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion % Test,
// akka-persistence-inmemory が 2.6.x 系に対応していない。
// TODO 2.6.x 系に対応できる方法に変更する。
"com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2" % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Following classes and objects will be package private in next release
#
# object lerna.akka.entityreplication.ReplicationActor#EntityRecoveryTimeoutException does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$EntityRecoveryTimeoutException$")
# class lerna.akka.entityreplication.ReplicationActor#Snapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$Snapshot")
# object lerna.akka.entityreplication.ReplicationActor#TakeSnapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$TakeSnapshot$")
# class lerna.akka.entityreplication.ReplicationActor#EntityRecoveryTimeoutException does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$EntityRecoveryTimeoutException")
# object lerna.akka.entityreplication.ReplicationActor#RecoveryTimeout does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$RecoveryTimeout$")
# object lerna.akka.entityreplication.ReplicationActor#Snapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$Snapshot$")
# class lerna.akka.entityreplication.ReplicationActor#TakeSnapshot does not have a correspondent in current version
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.ReplicationActor$TakeSnapshot")
# class lerna.akka.entityreplication.raft.RaftProtocol#ReplicationSucceeded is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$ReplicationSucceeded")
# class lerna.akka.entityreplication.raft.RaftProtocol#Command is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Command")
# class lerna.akka.entityreplication.raft.RaftProtocol#Replicate is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Replicate")
# class lerna.akka.entityreplication.raft.RaftProtocol#ForwardedCommand is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$ForwardedCommand")
# class lerna.akka.entityreplication.raft.RaftProtocol#Replica is declared final in current version
ProblemFilters.exclude[FinalClassProblem]("lerna.akka.entityreplication.raft.RaftProtocol$Replica")
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# [ClusterReplication(system): ClusterReplication] can be used as before.
#
# static method apply(akka.actor.ActorSystem)lerna.akka.entityreplication.ClusterReplication in class lerna.akka.entityreplication.ClusterReplication has a different result type in current version, where it is akka.actor.Extension rather than lerna.akka.entityreplication.ClusterReplication
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.ClusterReplication.apply")
# method this(akka.actor.ActorSystem)Unit in class lerna.akka.entityreplication.ClusterReplication's type is different in current version, where it is (akka.actor.ExtendedActorSystem)Unit instead of (akka.actor.ActorSystem)Unit
ProblemFilters.exclude[IncompatibleMethTypeProblem]("lerna.akka.entityreplication.ClusterReplication.this")
# method apply(akka.actor.ActorSystem)lerna.akka.entityreplication.ClusterReplication in object lerna.akka.entityreplication.ClusterReplication has a different result type in current version, where it is akka.actor.Extension rather than lerna.akka.entityreplication.ClusterReplication
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.ClusterReplication.apply")
8 changes: 8 additions & 0 deletions src/main/protobuf/cluster_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ message MemberIndex {
required string role = 1;
}

// ===
// typed
// ===

message ReplicationEnvelope {
required string entityId = 1;
required Payload message = 2;
}

// ===
// payload
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ lerna.akka.entityreplication {
// The number of roles is the number of replicas. It is recommended to set up at least three roles.
multi-raft-roles = ["replica-group-1", "replica-group-2", "replica-group-3"]

// Number of shards per single multi-raft-role used by only typed APIs.
// This value must be the same for all nodes in the cluster
// and must not be changed after starting to use.
// Changing this value will cause data inconsistency.
number-of-shards = 100

// Maximum number of entries which AppendEntries contains.
// The too large size will cause message serialization failure.
max-append-entries-size = 16
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/akka/lerna/DeferredBehavior.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.lerna

/**
* Expose internal API of Akka to use it in [[lerna]] package.
*/
abstract class DeferredBehavior[Command] extends akka.actor.typed.internal.BehaviorImpl.DeferredBehavior[Command]
19 changes: 19 additions & 0 deletions src/main/scala/akka/lerna/InternalActorRefProxy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package akka.lerna

import akka.actor.ActorRefProvider
import akka.actor.typed.ActorRef
import akka.actor.typed.internal.adapter.ActorRefAdapter

object InternalActorRefProxy {
def apply[T](ref: ActorRef[T]): InternalActorRefProxy[T] =
new InternalActorRefProxy[T](ref)
}

class InternalActorRefProxy[T](ref: ActorRef[T]) {

private[this] val classicRef = ActorRefAdapter.toClassic(ref)

def provider: ActorRefProvider = classicRef.provider

def isTerminated: Boolean = classicRef.isTerminated
}
6 changes: 6 additions & 0 deletions src/main/scala/akka/lerna/InternalRecipientRef.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package akka.lerna

/**
* Expose internal API of Akka to use it in [[lerna]] package.
*/
trait InternalRecipientRef[-T] extends akka.actor.typed.internal.InternalRecipientRef[T]
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package lerna.akka.entityreplication

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.actor.{ ActorRef, ExtendedActorSystem, Extension, ExtensionId, Props }
import lerna.akka.entityreplication.model.TypeName
import lerna.akka.entityreplication.raft.eventsourced.{ CommitLogStore, ShardedCommitLogStore }

object ClusterReplication {
object ClusterReplication extends ExtensionId[ClusterReplication] {

def apply(system: ActorSystem): ClusterReplication = new ClusterReplication(system)
override def createExtension(system: ExtendedActorSystem): ClusterReplication = new ClusterReplication(system)

private val actorNamePrefix: String = "replicationRegion"

private[entityreplication] type EntityPropsProvider = ReplicationActorContext => Props
}

class ClusterReplication private (system: ActorSystem) {
class ClusterReplication private (system: ExtendedActorSystem) extends Extension {

import ClusterReplication._

Expand All @@ -21,6 +23,16 @@ class ClusterReplication private (system: ActorSystem) {
settings: ClusterReplicationSettings,
extractEntityId: ReplicationRegion.ExtractEntityId,
extractShardId: ReplicationRegion.ExtractShardId,
): ActorRef = {
internalStart(typeName, _ => entityProps, settings, extractEntityId, extractShardId)
}

private[entityreplication] def internalStart(
typeName: String,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ReplicationRegion.ExtractEntityId,
extractShardId: ReplicationRegion.ExtractShardId,
): ActorRef = {
val _typeName = TypeName.from(typeName)

Expand All @@ -31,7 +43,7 @@ class ClusterReplication private (system: ActorSystem) {
Option.when(enabled)(new ShardedCommitLogStore(_typeName, system))
}

system.actorOf(
system.systemActorOf(
ReplicationRegion.props(typeName, entityProps, settings, extractEntityId, extractShardId, maybeCommitLogStore),
s"$actorNamePrefix-$typeName",
)
Expand Down
13 changes: 2 additions & 11 deletions src/main/scala/lerna/akka/entityreplication/ReplicationActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication

import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{ Actor, ActorPath, ActorRef, Cancellable, Stash }
import akka.actor.{ Actor, Cancellable, Stash }
import akka.event.Logging
import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId }
import lerna.akka.entityreplication.raft.RaftProtocol._
Expand All @@ -15,18 +15,9 @@ private[entityreplication] object ReplicationActor {
private[this] val instanceIdCounter = new AtomicInteger(1)

private def generateInstanceId(): EntityInstanceId = EntityInstanceId(instanceIdCounter.getAndIncrement())

private[entityreplication] final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef)
private[entityreplication] final case class Snapshot(metadata: EntitySnapshotMetadata, state: EntityState)

private final case object RecoveryTimeout

private[entityreplication] final case class EntityRecoveryTimeoutException(entityPath: ActorPath)
extends RuntimeException
}

trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.StashFactory {
import ReplicationActor._
import context.dispatcher

private val internalStash = createStash()
Expand Down Expand Up @@ -92,7 +83,7 @@ trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.Stash

override def stateReceive(receive: Receive, message: Any): Unit =
message match {
case Command(command) =>
case ProcessCommand(command) =>
receive.applyOrElse[Any, Unit](
command,
command => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package lerna.akka.entityreplication

import akka.actor.ActorRef

private[entityreplication] class ReplicationActorContext(val entityId: String, val shard: ActorRef)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import akka.cluster.sharding.ShardRegion.{ GracefulShutdown, HashCodeMessageExtr
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.cluster.{ Cluster, Member, MemberStatus }
import akka.routing.{ ActorRefRoutee, ConsistentHashingRouter, ConsistentHashingRoutingLogic, Router }
import lerna.akka.entityreplication.ClusterReplication.EntityPropsProvider
import lerna.akka.entityreplication.ReplicationRegion.{ ExtractEntityId, ExtractShardId }
import lerna.akka.entityreplication.model._
import lerna.akka.entityreplication.raft.RaftActor
Expand All @@ -14,6 +15,7 @@ import lerna.akka.entityreplication.raft.eventsourced.CommitLogStore
import lerna.akka.entityreplication.raft.protocol.ShardRequest
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.ShardSnapshotStore
import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand

import scala.collection.mutable

Expand Down Expand Up @@ -53,7 +55,7 @@ object ReplicationRegion {

private[entityreplication] def props(
typeName: String,
entityProps: Props,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ExtractEntityId,
extractShardId: ExtractShardId,
Expand All @@ -63,7 +65,7 @@ object ReplicationRegion {

private[entityreplication] case class CreateShard(shardId: NormalizedShardId) extends ShardRequest

final case class Passivate(entityPath: ActorPath, stopMessage: Any)
final case class Passivate(entityPath: ActorPath, stopMessage: Any) extends ShardCommand

private[entityreplication] sealed trait RoutingCommand
private[entityreplication] final case class Broadcast(message: Any) extends RoutingCommand
Expand All @@ -80,7 +82,7 @@ object ReplicationRegion {

private[entityreplication] class ReplicationRegion(
typeName: String,
entityProps: Props,
entityProps: EntityPropsProvider,
settings: ClusterReplicationSettings,
extractEntityId: ExtractEntityId,
extractShardId: ExtractShardId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package lerna.akka.entityreplication.model

import akka.actor.ActorPath
import akka.util.ByteString
import lerna.akka.entityreplication.ReplicationRegion.EntityId

import java.net.URLEncoder
import java.net.{ URLDecoder, URLEncoder }

private[entityreplication] object NormalizedEntityId {
def from(entityId: EntityId): NormalizedEntityId = new NormalizedEntityId(URLEncoder.encode(entityId, "utf-8"))
Expand All @@ -14,4 +15,7 @@ private[entityreplication] object NormalizedEntityId {
new NormalizedEntityId(encodedEntityId)
}

private[entityreplication] final case class NormalizedEntityId private (underlying: String) extends AnyVal
private[entityreplication] final case class NormalizedEntityId private (underlying: String) extends AnyVal {

def raw: EntityId = URLDecoder.decode(underlying, ByteString.UTF_8)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ExtendedActorSystem
import akka.persistence.query.{ NoOffset, Offset, Sequence, TimeBasedUUID }
import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest, Serializers }
import com.google.protobuf.ByteString
import lerna.akka.entityreplication.{ model, raft, ClusterReplicationSerializable }
import lerna.akka.entityreplication.{ model, raft, typed, ClusterReplicationSerializable }

import java.io.NotSerializableException
import java.util.UUID
Expand Down Expand Up @@ -52,6 +52,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
private val TimeBasedUUIDManifest = "EE"
// raft.model
private val NoOpManifest = "FA"
// typed
private val ReplicationEnvelopeManifest = "GA"

// Manifest -> fromBinary
private val fromBinaryMap = HashMap[String, Array[Byte] => ClusterReplicationSerializable](
Expand Down Expand Up @@ -90,6 +92,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
// raft.model
NoOpManifest -> noOpFromBinary,
// typed
ReplicationEnvelopeManifest -> replicationEnvelopeFromBinary,
)

override def manifest(o: AnyRef): String = {
Expand Down Expand Up @@ -156,6 +160,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
// raft.model
case _: raft.model.NoOp.type => NoOpManifest
// typed
case _: typed.ReplicationEnvelope[_] => ReplicationEnvelopeManifest
}

private val serializableToBinary: PartialFunction[ClusterReplicationSerializable, Array[Byte]] = {
Expand Down Expand Up @@ -194,6 +200,8 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
// raft.model
case m: raft.model.NoOp.type => noOpToBinary(m)
// typed
case m: typed.ReplicationEnvelope[_] => replicationEnvelopeToBinary(m)
}

// ===
Expand Down Expand Up @@ -907,6 +915,26 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
)
}

// ===
// typed
// ===

private def replicationEnvelopeToBinary(message: typed.ReplicationEnvelope[_]): Array[Byte] = {
msg.ReplicationEnvelope
.of(
entityId = message.entityId,
message = payloadToProto(message.message),
).toByteArray
}

private def replicationEnvelopeFromBinary(bytes: Array[Byte]): typed.ReplicationEnvelope[_] = {
val proto = msg.ReplicationEnvelope.parseFrom(bytes)
typed.ReplicationEnvelope(
proto.entityId,
message = payloadFromProto(proto.message),
)
}

// ===
// payload
// ===
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package lerna.akka.entityreplication.raft

import lerna.akka.entityreplication.ReplicationActor
import lerna.akka.entityreplication.raft.RaftProtocol._
import lerna.akka.entityreplication.raft.protocol.RaftCommands._
import lerna.akka.entityreplication.raft.protocol.{ SuspendEntity, TryCreateEntity }
Expand Down Expand Up @@ -42,7 +41,7 @@ private[raft] trait Candidate { this: RaftActor =>
case response: SnapshotProtocol.FetchSnapshotResponse => receiveFetchSnapshotResponse(response)
case SuspendEntity(_, entityId, stopMessage) => suspendEntity(entityId, stopMessage)
case SnapshotTick => handleSnapshotTick()
case response: ReplicationActor.Snapshot => receiveEntitySnapshotResponse(response)
case response: Snapshot => receiveEntitySnapshotResponse(response)
case response: SnapshotProtocol.SaveSnapshotResponse => receiveSaveSnapshotResponse(response)
case _: akka.persistence.SaveSnapshotSuccess => // ignore
case _: akka.persistence.SaveSnapshotFailure => // ignore: no problem because events exist even if snapshot saving failed
Expand Down
Loading

0 comments on commit 165d1ce

Please sign in to comment.