Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make internal API private #47

Merged
merged 5 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ object ClusterReplication {
val actorNamePrefix: String = "replicationRegion"
tksugimoto marked this conversation as resolved.
Show resolved Hide resolved
}

class ClusterReplication(system: ActorSystem) {
class ClusterReplication private (system: ActorSystem) {

import ClusterReplication._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
package lerna.akka.entityreplication
trait ClusterReplicationSerializable extends Serializable
private[entityreplication] trait ClusterReplicationSerializable extends Serializable
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import lerna.akka.entityreplication.raft.model.{ LogEntryIndex, NoOp }
import lerna.akka.entityreplication.raft.protocol.SnapshotOffer
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._

object ReplicationActor {
private[entityreplication] object ReplicationActor {

private val instanceIdCounter = new AtomicInteger(1)
private[this] val instanceIdCounter = new AtomicInteger(1)

private def generateInstanceId(): EntityInstanceId = EntityInstanceId(instanceIdCounter.getAndIncrement())

final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef)
final case class Snapshot(metadata: EntitySnapshotMetadata, state: EntityState)
private[entityreplication] final case class TakeSnapshot(metadata: EntitySnapshotMetadata, replyTo: ActorRef)
private[entityreplication] final case class Snapshot(metadata: EntitySnapshotMetadata, state: EntityState)

private final case object RecoveryTimeout

final case class EntityRecoveryTimeoutException(entityPath: ActorPath) extends RuntimeException
private[entityreplication] final case class EntityRecoveryTimeoutException(entityPath: ActorPath)
extends RuntimeException
}

trait ReplicationActor[StateData] extends Actor with Stash with akka.lerna.StashFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object ReplicationRegion {

private[entityreplication] type ExtractNormalizedShardId = PartialFunction[Msg, NormalizedShardId]

def props(
private[entityreplication] def props(
typeName: String,
entityProps: Props,
settings: ClusterReplicationSettings,
Expand All @@ -61,24 +61,24 @@ object ReplicationRegion {
) =
Props(new ReplicationRegion(typeName, entityProps, settings, extractEntityId, extractShardId, maybeCommitLogStore))

case class CreateShard(shardId: NormalizedShardId) extends ShardRequest
private[entityreplication] case class CreateShard(shardId: NormalizedShardId) extends ShardRequest

final case class Passivate(entityPath: ActorPath, stopMessage: Any)

sealed trait RoutingCommand
final case class Broadcast(message: Any) extends RoutingCommand
final case class BroadcastWithoutSelf(message: Any) extends RoutingCommand
final case class DeliverTo(index: MemberIndex, message: Any) extends RoutingCommand
final case class DeliverSomewhere(message: Any) extends RoutingCommand
private[entityreplication] sealed trait RoutingCommand
private[entityreplication] final case class Broadcast(message: Any) extends RoutingCommand
private[entityreplication] final case class BroadcastWithoutSelf(message: Any) extends RoutingCommand
private[entityreplication] final case class DeliverTo(index: MemberIndex, message: Any) extends RoutingCommand
private[entityreplication] final case class DeliverSomewhere(message: Any) extends RoutingCommand

/**
* [[ReplicationRegion]] 同士の通信で利用。適切なノードにメッセージがルーティング済みであることを表す
* @param message
*/
final case class Routed(message: Any)
private[entityreplication] final case class Routed(message: Any)
}

class ReplicationRegion(
private[entityreplication] class ReplicationRegion(
typeName: String,
entityProps: Props,
settings: ClusterReplicationSettings,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package lerna.akka.entityreplication.model

final case class EntityInstanceId(underlying: Int) extends AnyVal
private[entityreplication] final case class EntityInstanceId(underlying: Int) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import lerna.akka.entityreplication.ReplicationRegion.EntityId

import java.net.URLEncoder

object NormalizedEntityId {
private[entityreplication] object NormalizedEntityId {
def from(entityId: EntityId): NormalizedEntityId = new NormalizedEntityId(URLEncoder.encode(entityId, "utf-8"))

def of(entityPath: ActorPath): NormalizedEntityId = new NormalizedEntityId(entityPath.name)
Expand All @@ -14,4 +14,4 @@ object NormalizedEntityId {
new NormalizedEntityId(encodedEntityId)
}

final case class NormalizedEntityId private (underlying: String) extends AnyVal
private[entityreplication] final case class NormalizedEntityId private (underlying: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.net.{ URLDecoder, URLEncoder }

import akka.actor.ActorPath

object NormalizedShardId {
private[entityreplication] object NormalizedShardId {
def from(shardId: String): NormalizedShardId = new NormalizedShardId(URLEncoder.encode(shardId, "utf-8"))

private[entityreplication] def from(path: ActorPath) = new NormalizedShardId(path.name)
Expand All @@ -13,6 +13,6 @@ object NormalizedShardId {
new NormalizedShardId(encodedShardId)
}

final case class NormalizedShardId private (underlying: String) extends AnyVal {
private[entityreplication] final case class NormalizedShardId private (underlying: String) extends AnyVal {
def raw: String = URLDecoder.decode(underlying, "utf-8")
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package lerna.akka.entityreplication.model

import java.net.URLEncoder

object TypeName {
private[entityreplication] object TypeName {
def from(typeName: String): TypeName = new TypeName(URLEncoder.encode(typeName, "utf-8"))
}

final class TypeName private (val underlying: String) extends AnyVal {
private[entityreplication] final class TypeName private (val underlying: String) extends AnyVal {
override def toString: String = underlying
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import lerna.akka.entityreplication.raft.protocol.{ SuspendEntity, TryCreateEnti
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager

trait Candidate { this: RaftActor =>
private[raft] trait Candidate { this: RaftActor =>
import RaftActor._

def candidateBehavior: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.{ ReplicationActor, ReplicationRegion }

trait Follower { this: RaftActor =>
private[raft] trait Follower { this: RaftActor =>
import RaftActor._

def followerBehavior: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol
import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.{ ReplicationActor, ReplicationRegion }

trait Leader { this: RaftActor =>
private[raft] trait Leader { this: RaftActor =>
import RaftActor._

def leaderBehavior: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager
import lerna.akka.entityreplication.util.ActorIds
import lerna.akka.entityreplication.{ ClusterReplicationSerializable, ReplicationActor, ReplicationRegion }

object RaftActor {
private[entityreplication] object RaftActor {

def props(
typeName: TypeName,
Expand Down Expand Up @@ -95,7 +95,7 @@ object RaftActor {
trait NonPersistEventLike extends NonPersistEvent // テスト用
}

class RaftActor(
private[raft] class RaftActor(
typeName: TypeName,
val extractEntityId: PartialFunction[Msg, (NormalizedEntityId, Msg)],
replicationActorProps: Props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer }
import lerna.akka.entityreplication.raft.PersistentStateData.PersistentState
import lerna.akka.entityreplication.raft.RaftActor._

object RaftActorBase {
private[raft] object RaftActorBase {

object `->` {
def unapply(in: (State, State)) = Some(in)
}
}

trait RaftActorBase extends PersistentActor with ActorLogging {
private[raft] trait RaftActorBase extends PersistentActor with ActorLogging {

type TransitionHandler = PartialFunction[(State, State), Unit]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import lerna.akka.entityreplication.raft.model._
import lerna.akka.entityreplication.raft.routing.MemberIndex
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshotMetadata

object PersistentStateData {
private[entityreplication] object PersistentStateData {

final case class PersistentState(
currentTerm: Term,
Expand All @@ -16,7 +16,7 @@ object PersistentStateData {
) extends ClusterReplicationSerializable
}

trait PersistentStateData[T <: PersistentStateData[T]] {
private[entityreplication] trait PersistentStateData[T <: PersistentStateData[T]] {
import PersistentStateData._

def currentTerm: Term
Expand All @@ -35,7 +35,7 @@ trait PersistentStateData[T <: PersistentStateData[T]] {
PersistentState(currentTerm, votedFor, replicatedLog, lastSnapshotStatus)
}

trait VolatileStateData[T <: VolatileStateData[T]] {
private[entityreplication] trait VolatileStateData[T <: VolatileStateData[T]] {
def commitIndex: LogEntryIndex
def lastApplied: LogEntryIndex
def snapshottingProgress: SnapshottingProgress
Expand All @@ -47,7 +47,7 @@ trait VolatileStateData[T <: VolatileStateData[T]] {
): T
}

trait FollowerData { self: RaftMemberData =>
private[entityreplication] trait FollowerData { self: RaftMemberData =>
def leaderMember: Option[MemberIndex]

def initializeFollowerData(): RaftMemberData = {
Expand Down Expand Up @@ -101,7 +101,7 @@ trait FollowerData { self: RaftMemberData =>
protected def updateFollowerVolatileState(leaderMember: Option[MemberIndex] = leaderMember): RaftMemberData
}

trait CandidateData { self: RaftMemberData =>
private[entityreplication] trait CandidateData { self: RaftMemberData =>
def acceptedMembers: Set[MemberIndex]

def initializeCandidateData(): RaftMemberData = {
Expand All @@ -122,7 +122,7 @@ trait CandidateData { self: RaftMemberData =>
protected def updateCandidateVolatileState(acceptedMembers: Set[MemberIndex]): RaftMemberData
}

trait LeaderData { self: RaftMemberData =>
private[entityreplication] trait LeaderData { self: RaftMemberData =>
def nextIndex: Option[NextIndex]
def matchIndex: MatchIndex
def clients: Map[LogEntryIndex, ClientContext]
Expand Down Expand Up @@ -205,7 +205,7 @@ trait LeaderData { self: RaftMemberData =>
): RaftMemberData
}

object RaftMemberData {
private[entityreplication] object RaftMemberData {
import PersistentStateData._

def apply(persistentState: PersistentState): RaftMemberData = {
Expand Down Expand Up @@ -248,7 +248,7 @@ object RaftMemberData {
)
}

trait RaftMemberData
private[entityreplication] trait RaftMemberData
extends PersistentStateData[RaftMemberData]
with VolatileStateData[RaftMemberData]
with FollowerData
Expand Down Expand Up @@ -347,7 +347,7 @@ trait RaftMemberData

}

final case class RaftMemberDataImpl(
private[entityreplication] final case class RaftMemberDataImpl(
currentTerm: Term,
votedFor: Option[MemberIndex],
replicatedLog: ReplicatedLog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import lerna.akka.entityreplication.model.{ EntityInstanceId, NormalizedEntityId
import lerna.akka.entityreplication.raft.model.{ LogEntry, LogEntryIndex }
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol.EntitySnapshot

object RaftProtocol {
private[entityreplication] object RaftProtocol {

final case class RequestRecovery(entityId: NormalizedEntityId)
final case class RecoveryState(events: Seq[LogEntry], snapshot: Option[EntitySnapshot])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import scala.jdk.DurationConverters._
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.util.Random

object RaftSettings {
private[entityreplication] object RaftSettings {
tksugimoto marked this conversation as resolved.
Show resolved Hide resolved
def apply(root: Config) = new RaftSettings(root)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package lerna.akka.entityreplication.raft.eventsourced
import lerna.akka.entityreplication.model.NormalizedShardId
import lerna.akka.entityreplication.raft.model.LogEntryIndex

trait CommitLogStore {
private[entityreplication] trait CommitLogStore {
private[raft] def save(
shardId: NormalizedShardId,
index: LogEntryIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private[entityreplication] final case class Save(
committedEvent: Any,
) extends ClusterReplicationSerializable

object CommitLogStoreActor {
private[entityreplication] object CommitLogStoreActor {

def startClusterSharding(typeName: TypeName, system: ActorSystem): ActorRef = {
val clusterSharding = ClusterSharding(system)
Expand All @@ -44,7 +44,7 @@ object CommitLogStoreActor {
private def props(typeName: TypeName): Props = Props(new CommitLogStoreActor(typeName))
}

class CommitLogStoreActor(typeName: TypeName) extends PersistentActor {
private[entityreplication] class CommitLogStoreActor(typeName: TypeName) extends PersistentActor {
// TODO: 複数 Raft(typeName) に対応するために typeName ごとに cassandra-journal.keyspace を分ける
override def journalPluginId: String =
context.system.settings.config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ import lerna.akka.entityreplication.ClusterReplicationSerializable
/**
* index を揃えるために InternalEvent も永続化必要
*/
case object InternalEvent extends ClusterReplicationSerializable
private[entityreplication] case object InternalEvent extends ClusterReplicationSerializable
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import lerna.akka.entityreplication.raft.model.LogEntryIndex
import scala.jdk.DurationConverters._
import scala.concurrent.duration.FiniteDuration

class ShardedCommitLogStore(typeName: TypeName, system: ActorSystem) extends CommitLogStore {
private[entityreplication] class ShardedCommitLogStore(typeName: TypeName, system: ActorSystem) extends CommitLogStore {
import system.dispatcher
private implicit val scheduler: Scheduler = system.scheduler

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ package lerna.akka.entityreplication.raft.model
import akka.actor.ActorRef
import lerna.akka.entityreplication.model.EntityInstanceId

case class ClientContext(ref: ActorRef, instanceId: Option[EntityInstanceId], originSender: Option[ActorRef])
private[entityreplication] final case class ClientContext(
ref: ActorRef,
instanceId: Option[EntityInstanceId],
originSender: Option[ActorRef],
)
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package lerna.akka.entityreplication.raft.model

import lerna.akka.entityreplication.model.NormalizedEntityId

final case class EntityEvent(entityId: Option[NormalizedEntityId], event: Any)
private[entityreplication] final case class EntityEvent(entityId: Option[NormalizedEntityId], event: Any)
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package lerna.akka.entityreplication.raft.model

object LogEntry {
private[entityreplication] object LogEntry {

def apply(index: LogEntryIndex, event: EntityEvent, term: Term) =
new LogEntry(index, event, term)
}

class LogEntry(val index: LogEntryIndex, val event: EntityEvent, val term: Term) extends Serializable {
private[entityreplication] class LogEntry(val index: LogEntryIndex, val event: EntityEvent, val term: Term)
extends Serializable {
require(index > LogEntryIndex.initial())

def canEqual(other: Any): Boolean = other.isInstanceOf[LogEntry]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft.model

import lerna.akka.entityreplication.raft.model.exception.SeqIndexOutOfBoundsException

object LogEntryIndex {
private[entityreplication] object LogEntryIndex {

def initial(): LogEntryIndex = LogEntryIndex(0)

Expand All @@ -11,7 +11,8 @@ object LogEntryIndex {
}
}

case class LogEntryIndex(private[entityreplication] val underlying: Long) extends Ordered[LogEntryIndex] {
private[entityreplication] final case class LogEntryIndex(private[entityreplication] val underlying: Long)
extends Ordered[LogEntryIndex] {
require(underlying >= 0)

def next(): LogEntryIndex = copy(underlying + 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lerna.akka.entityreplication.raft.model

import lerna.akka.entityreplication.raft.routing.MemberIndex

case class MatchIndex(indexes: Map[MemberIndex, LogEntryIndex] = Map()) {
private[entityreplication] final case class MatchIndex(indexes: Map[MemberIndex, LogEntryIndex] = Map()) {

def update(follower: MemberIndex, index: LogEntryIndex): MatchIndex = {
copy(indexes + (follower -> index))
Expand Down
Loading