Skip to content

Commit

Permalink
Merge pull request #172 from lerna-stack/252-impl-shard-id-extractor
Browse files Browse the repository at this point in the history
Add function extract shard id from entity id
  • Loading branch information
xirc authored Oct 5, 2022
2 parents 457f529 + 61951df commit 618a211
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
- Add diagnostic logging to CommitLogStoreActor
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)

### Fixed
- RaftActor might delete committed entries
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# New function is added in lerna.akka.entityreplication.typed.ClusterReplication.
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.typed.ClusterReplication.shardIdOf")
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,13 @@ trait ClusterReplication extends Extension {
*/
def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M]

/**
* Extract shard id of given entity id.
*
* @param typeKey
* @param entityId
* @tparam M the type parameter of the typeKey
* @return shard id
*/
def shardIdOf[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@ import java.util.concurrent.ConcurrentHashMap

private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_]) extends ClusterReplication {

private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ActorRef[Nothing]] =
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ActorRef[_]].asScala
private case class ReplicationRegionEntry(settings: ClusterReplicationSettings, regionRef: ActorRef[Nothing])

private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ReplicationRegionEntry] =
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ReplicationRegionEntry].asScala

override def init[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] =
regions.getOrElseUpdate(entity.typeKey, internalInit(entity)).unsafeUpcast[E]
regions
.getOrElseUpdate(
entity.typeKey,
internalInit(entity),
).regionRef.unsafeUpcast[E]

private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] = {
private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ReplicationRegionEntry = {
val classicSystem = system.toClassic
val settings = entity.settings.getOrElse(untyped.ClusterReplicationSettings.create(classicSystem))
val settings = entity.settings.getOrElse(ClusterReplicationSettings(system))
val extractEntityId: untyped.ReplicationRegion.ExtractEntityId = {
case ReplicationEnvelope(entityId, message) => (entityId, message)
}
val extractShardId: untyped.ReplicationRegion.ExtractShardId = {
case ReplicationEnvelope(entityId, _) =>
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
shardIdOf(settings, entityId)
}
val possibleShardIds: Set[untyped.ReplicationRegion.ShardId] = {
(0 until settings.raftSettings.numberOfShards).map(_.toString).toSet
Expand Down Expand Up @@ -67,13 +73,31 @@ private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_])
extractShardId = extractShardId,
possibleShardIds = possibleShardIds,
)
region.toTyped
ReplicationRegionEntry(settings, region.toTyped)
}

override def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M] =
regions.get(typeKey) match {
case Some(region) =>
case Some(ReplicationRegionEntry(_, region)) =>
new ReplicatedEntityRefImpl[M](typeKey, entityId, region.unsafeUpcast[ReplicationEnvelope[M]], system)
case None => throw new IllegalStateException(s"The type [${typeKey}] must be init first")
}

override def shardIdOf[M](
typeKey: ReplicatedEntityTypeKey[M],
entityId: String,
): untyped.ReplicationRegion.ShardId = {
regions.get(typeKey) match {
case Some(ReplicationRegionEntry(settings, _)) =>
shardIdOf(settings, entityId)
case None =>
throw new IllegalStateException(s"The type [${typeKey}] must be init first")
}
}

private def shardIdOf(
settings: ClusterReplicationSettings,
entityId: String,
): String =
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,28 @@ class ClusterReplicationSpec extends FlatSpec with Matchers with ScalaFutures wi

clusterReplication.entityRefFor(typeKey, "test") shouldBe a[ReplicatedEntityRef[_]]
}

behavior of "ClusterReplication.shardIdOf"

it should "throw an exception if the typeKey has not initialized" in {
val typeKey = ReplicatedEntityTypeKey[NotUsed]("NotInitialized")
val entityId = "entity-id"
val exception = intercept[IllegalStateException] {
clusterReplication.shardIdOf(typeKey, entityId)
}
exception.getMessage should be(
"The type [ReplicatedEntityTypeKey[akka.NotUsed](NotInitialized)] must be init first",
)
}

it should "extract shardId from given entityId" in {
val typeKey = ReplicatedEntityTypeKey[NotUsed]("ExtractShardId")
val entity = ReplicatedEntity(typeKey)(_ => Behaviors.empty)
clusterReplication.init(entity)

val entityId = "entity-id"
val shardId = clusterReplication.shardIdOf(typeKey, entityId)
val settings = ClusterReplicationSettings(actorTestKit.system)
assert(shardId.toInt >= 0 && shardId.toInt < settings.raftSettings.numberOfShards)
}
}

0 comments on commit 618a211

Please sign in to comment.