Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

simplify StreamsBasedServer example #179

Merged
merged 8 commits into from
Oct 21, 2020
76 changes: 34 additions & 42 deletions examples/src/main/scala/StreamsBasedServer.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,46 @@
package zio.nio.examples

import zio._
import zio.clock.Clock
import zio.console.Console
import zio.duration._
import zio.nio.core.SocketAddress
import zio.nio.channels._
import zio.stream._

object StreamsBasedServer extends App {

def run(args: List[String]) =
ZStream
.managed(server(8080))
.flatMap(handleConnections(_) { chunk =>
console.putStrLn(s"Read data: ${chunk.mkString}") *>
clock.sleep(2.seconds) *>
console.putStrLn("Done")
})
.runDrain
.orDie
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
server(8080, 16).orDie
.as(0)

def server(port: Int): Managed[Exception, AsynchronousServerSocketChannel] =
for {
server <- AsynchronousServerSocketChannel()
socketAddress <- SocketAddress.inetSocketAddress(port).toManaged_
_ <- server.bind(socketAddress).toManaged_
} yield server
def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] =
AsynchronousServerSocketChannel()
.use(socket =>
for {
_ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind
_ <- ZStream
.repeatEffect(socket.accept.preallocate)
.map(_.withEarlyRelease)
.mapMPar(parallelism)(_.use((handleChannel _).tupled))
.runDrain
} yield ()
)

def handleConnections[R <: console.Console](
server: AsynchronousServerSocketChannel
)(f: String => RIO[R, Unit]): ZStream[R, Throwable, Unit] =
ZStream
.repeatEffect(server.accept.preallocate)
.map(conn => ZStream.managed(conn.ensuring(console.putStrLn("Connection closed")).withEarlyRelease))
.flatMapPar[R, Throwable, Unit](16) { connection =>
connection
.mapM {
case (closeConn, channel) =>
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.take(4)
.transduce(ZSink.utf8DecodeChunk)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- f(data)
} yield ()
}
}
def handleChannel(
closeConn: URIO[Any, Any],
channel: AsynchronousSocketChannel
): ZIO[Clock with Console, Nothing, Unit] =
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.take(4)
.transduce(ZSink.utf8DecodeChunk)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- console.putStrLn(s"Read data: ${data.mkString}") *>
clock.sleep(3.seconds) *>
console.putStrLn("Done")
} yield ()
}