Skip to content

Commit

Permalink
Add a layer to simplify local testing (#152)
Browse files Browse the repository at this point in the history
* Add a layer to simplify local testing

* Make things public
  • Loading branch information
ghostdogpr authored Jan 6, 2025
1 parent 960ab6e commit 3b8dea2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object Pods {

/**
* A layer that creates a service that does nothing when called.
* Useful for testing ShardManager or when using Sharding.local.
* Useful for testing ShardManager or when we don't need messages being sent.
*/
val noop: ULayer[Pods] =
ZLayer.succeed(new Pods {
Expand Down
121 changes: 121 additions & 0 deletions entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
import com.devsisters.shardcake.interfaces.Pods.BinaryMessage
import zio.{ Promise, Queue, RLayer, Task, ULayer, URLayer, ZIO, ZLayer }
import zio.stream.ZStream

object LocalSharding {

trait LocalQueue {
def localQueue: Queue[LocalQueueMessage]
}

sealed trait LocalQueueMessage
object LocalQueueMessage {
case class SendMessage(request: BinaryMessage, response: Promise[Nothing, Option[Array[Byte]]])
extends LocalQueueMessage
case class SendStream(
request: ZStream[Any, Throwable, BinaryMessage],
response: Promise[Nothing, Option[Array[Byte]]]
) extends LocalQueueMessage
case class SendMessageAndReceiveStream(
request: BinaryMessage,
response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]]
) extends LocalQueueMessage
case class SendStreamAndReceiveStream(
request: ZStream[Any, Throwable, BinaryMessage],
response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]]
) extends LocalQueueMessage
}

val localQueue: ULayer[LocalQueue] =
ZLayer(
Queue
.unbounded[LocalQueueMessage]
.map(queue =>
new LocalQueue {
def localQueue: Queue[LocalQueueMessage] = queue
}
)
)

val localPods: URLayer[LocalQueue, Pods] =
ZLayer {
ZIO.serviceWith[LocalQueue](_.localQueue).map { queue =>
new Pods {
def assignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit
def unassignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit
def ping(pod: PodAddress): Task[Unit] = ZIO.unit

def sendMessage(pod: PodAddress, message: BinaryMessage): Task[Option[Array[Byte]]] =
Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise =>
queue.offer(LocalQueueMessage.SendMessage(message, promise)) *> promise.await
}

def sendStream(
pod: PodAddress,
entityId: String,
messages: ZStream[Any, Throwable, BinaryMessage]
): Task[Option[Array[Byte]]] =
Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise =>
queue.offer(LocalQueueMessage.SendStream(messages, promise)).fork *> promise.await
}

def sendMessageAndReceiveStream(
pod: PodAddress,
message: BinaryMessage
): ZStream[Any, Throwable, Array[Byte]] =
ZStream.unwrap {
Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise =>
queue.offer(LocalQueueMessage.SendMessageAndReceiveStream(message, promise)) *> promise.await
}
}

def sendStreamAndReceiveStream(
pod: PodAddress,
entityId: String,
messages: ZStream[Any, Throwable, BinaryMessage]
): ZStream[Any, Throwable, Array[Byte]] =
ZStream.unwrap {
Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise =>
queue.offer(LocalQueueMessage.SendStreamAndReceiveStream(messages, promise)).fork *> promise.await
}
}
}
}
}

val localServer: RLayer[Sharding with LocalQueue, Unit] =
ZLayer.scoped {
for {
sharding <- ZIO.service[Sharding]
queue <- ZIO.serviceWith[LocalQueue](_.localQueue)
_ <- ZStream
.fromQueueWithShutdown(queue)
.runForeach {
case LocalQueueMessage.SendMessage(request, response) =>
sharding.sendToLocalEntity(request).flatMap(response.succeed)
case LocalQueueMessage.SendStream(request, response) =>
sharding.sendStreamToLocalEntity(request).flatMap(response.succeed)
case LocalQueueMessage.SendMessageAndReceiveStream(request, response) =>
response.succeed(sharding.sendToLocalEntityAndReceiveStream(request))
case LocalQueueMessage.SendStreamAndReceiveStream(request, response) =>
response.succeed(sharding.sendStreamToLocalEntityAndReceiveStream(request))
}
.forkScoped
} yield ()
}

/**
* A special layer meant for testing that uses a local queue rather than an external transport.
* This layer will only work in a single JVM and is not suitable for production use.
*/
val live: RLayer[ShardManagerClient with Storage with Serialization with Config, Sharding] =
ZLayer.makeSome[ShardManagerClient with Storage with Serialization with Config, Sharding](
localQueue,
localPods,
localServer,
Sharding.live
)
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
import zio.{ Config => _, _ }
import com.devsisters.shardcake.interfaces.{ Serialization, Storage }
import zio.test.TestAspect.{ sequential, withLiveClock }
import zio.test._
import zio.{ Config => _, _ }

import scala.util.Success

object BroadcastingSpec extends ZIOSpecDefault {

private val config = ZLayer.succeed(Config.default)
private val config = ZLayer.succeed(Config.default.copy(simulateRemotePods = true))

def spec: Spec[TestEnvironment with Scope, Any] =
suite("BroadcastingSpec")(
Expand All @@ -28,9 +28,8 @@ object BroadcastingSpec extends ZIOSpecDefault {
}
}
).provideShared(
Sharding.live,
Serialization.javaSerialization,
Pods.noop,
LocalSharding.live,
ShardManagerClient.local,
Storage.memory,
config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package com.devsisters.shardcake

import com.devsisters.shardcake.CounterActor.CounterMessage._
import com.devsisters.shardcake.CounterActor._
import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage }
import zio.{ Config => _, _ }
import com.devsisters.shardcake.interfaces.{ Serialization, Storage }
import zio.stream.{ SubscriptionRef, ZStream }
import zio.test.TestAspect.{ sequential, withLiveClock }
import zio.test._
import zio.{ Config => _, _ }

object ShardingSpec extends ZIOSpecDefault {
def spec: Spec[TestEnvironment with Scope, Any] =
Expand Down Expand Up @@ -144,9 +144,8 @@ object ShardingSpec extends ZIOSpecDefault {
}
}
).provideShared(
Sharding.live,
Serialization.javaSerialization,
Pods.noop,
LocalSharding.live,
ShardManagerClient.local,
Storage.memory,
ZLayer.succeed(Config.default)
Expand Down

0 comments on commit 3b8dea2

Please sign in to comment.