Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add function extract shard id from entity id #172

Merged
merged 12 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
- Add diagnostic logging to CommitLogStoreActor
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)

### Fixed
- RaftActor might delete committed entries
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# New function is added in lerna.akka.entityreplication.typed.ClusterReplication.
ProblemFilters.exclude[ReversedMissingMethodProblem]("lerna.akka.entityreplication.typed.ClusterReplication.shardIdOf")
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,13 @@ trait ClusterReplication extends Extension {
*/
def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M]

/**
* Extract shard id of given entity id.
*
* @param typeKey
* @param entityId
* @tparam M the type parameter of the typeKey
* @return shard id
*/
def shardIdOf[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): String
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,27 @@ import java.util.concurrent.ConcurrentHashMap

private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_]) extends ClusterReplication {

private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ActorRef[Nothing]] =
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ActorRef[_]].asScala
private case class ReplicationRegionEntry(settings: ClusterReplicationSettings, regionRef: ActorRef[Nothing])

private[this] val regions: concurrent.Map[ReplicatedEntityTypeKey[Nothing], ReplicationRegionEntry] =
new ConcurrentHashMap[ReplicatedEntityTypeKey[_], ReplicationRegionEntry].asScala

override def init[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] =
regions.getOrElseUpdate(entity.typeKey, internalInit(entity)).unsafeUpcast[E]
regions
.getOrElseUpdate(
entity.typeKey,
internalInit(entity),
).regionRef.unsafeUpcast[E]

private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ActorRef[E] = {
private[this] def internalInit[M, E](entity: ReplicatedEntity[M, E]): ReplicationRegionEntry = {
val classicSystem = system.toClassic
val settings = entity.settings.getOrElse(untyped.ClusterReplicationSettings.create(classicSystem))
val settings = entity.settings.getOrElse(ClusterReplicationSettings(system))
val extractEntityId: untyped.ReplicationRegion.ExtractEntityId = {
case ReplicationEnvelope(entityId, message) => (entityId, message)
}
val extractShardId: untyped.ReplicationRegion.ExtractShardId = {
case ReplicationEnvelope(entityId, _) =>
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
shardIdOf(settings, entityId)
}
val possibleShardIds: Set[untyped.ReplicationRegion.ShardId] = {
(0 until settings.raftSettings.numberOfShards).map(_.toString).toSet
Expand Down Expand Up @@ -67,13 +73,31 @@ private[entityreplication] class ClusterReplicationImpl(system: ActorSystem[_])
extractShardId = extractShardId,
possibleShardIds = possibleShardIds,
)
region.toTyped
ReplicationRegionEntry(settings, region.toTyped)
}

override def entityRefFor[M](typeKey: ReplicatedEntityTypeKey[M], entityId: String): ReplicatedEntityRef[M] =
regions.get(typeKey) match {
case Some(region) =>
case Some(ReplicationRegionEntry(_, region)) =>
new ReplicatedEntityRefImpl[M](typeKey, entityId, region.unsafeUpcast[ReplicationEnvelope[M]], system)
case None => throw new IllegalStateException(s"The type [${typeKey}] must be init first")
}

override def shardIdOf[M](
typeKey: ReplicatedEntityTypeKey[M],
entityId: String,
): untyped.ReplicationRegion.ShardId = {
regions.get(typeKey) match {
case Some(ReplicationRegionEntry(settings, _)) =>
shardIdOf(settings, entityId)
case None =>
throw new IllegalStateException(s"The type [${typeKey}] must be init first")
}
}

private def shardIdOf(
settings: ClusterReplicationSettings,
entityId: String,
): String =
Math.abs(entityId.hashCode % settings.raftSettings.numberOfShards).toString
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,28 @@ class ClusterReplicationSpec extends FlatSpec with Matchers with ScalaFutures wi

clusterReplication.entityRefFor(typeKey, "test") shouldBe a[ReplicatedEntityRef[_]]
}

behavior of "ClusterReplication.shardIdOf"

it should "throw an exception if the typeKey has not initialized" in {
val typeKey = ReplicatedEntityTypeKey[NotUsed]("NotInitialized")
val entityId = "entity-id"
val exception = intercept[IllegalStateException] {
clusterReplication.shardIdOf(typeKey, entityId)
}
exception.getMessage should be(
"The type [ReplicatedEntityTypeKey[akka.NotUsed](NotInitialized)] must be init first",
)
}

it should "extract shardId from given entityId" in {
val typeKey = ReplicatedEntityTypeKey[NotUsed]("ExtractShardId")
val entity = ReplicatedEntity(typeKey)(_ => Behaviors.empty)
clusterReplication.init(entity)

val entityId = "entity-id"
val shardId = clusterReplication.shardIdOf(typeKey, entityId)
val settings = ClusterReplicationSettings(actorTestKit.system)
assert(shardId.toInt >= 0 && shardId.toInt < settings.raftSettings.numberOfShards)
}
}