Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify StreamsBasedServer example #179

Merged
merged 8 commits into from
Oct 21, 2020
81 changes: 37 additions & 44 deletions examples/src/main/scala/StreamsBasedServer.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,47 @@
package zio.nio.examples

import zio._
import zio.clock.Clock
import zio.console.Console
import zio.duration._
import zio.nio.core.SocketAddress
import zio.nio.channels._
import zio.nio.core.SocketAddress
import zio.stream._

object StreamsBasedServer extends App {

def run(args: List[String]) =
ZStream
.managed(server(8080))
.flatMap(handleConnections(_) { chunk =>
console.putStrLn(s"Read data: ${chunk.mkString}") *>
clock.sleep(2.seconds) *>
console.putStrLn("Done")
})
.runDrain
.orDie
.exitCode
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
server(8080, 16).orDie
.as(ExitCode.success)

def server(port: Int): Managed[Exception, AsynchronousServerSocketChannel] =
for {
server <- AsynchronousServerSocketChannel()
socketAddress <- SocketAddress.inetSocketAddress(port).toManaged_
_ <- server.bind(socketAddress).toManaged_
} yield server
def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] =
AsynchronousServerSocketChannel()
.use(socket =>
for {
_ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind
_ <- ZStream
.repeatEffect(socket.accept.preallocate)
.map(_.withEarlyRelease)
.mapMPar(parallelism)(_.use((handleChannel _).tupled))
.runDrain
} yield ()
)

def handleConnections[R <: console.Console](
server: AsynchronousServerSocketChannel
)(f: String => RIO[R, Unit]): ZStream[R, Throwable, Unit] =
ZStream
.repeatEffect(server.accept.preallocate)
.map(conn => ZStream.managed(conn.ensuring(console.putStrLn("Connection closed")).withEarlyRelease))
.flatMapPar[R, Throwable, Unit](16) { connection =>
connection
.mapM { case (closeConn, channel) =>
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.flattenChunks
.take(4)
.transduce(ZTransducer.utf8Decode)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- f(data)
} yield ()
}
}
def handleChannel(
closeConn: URIO[Any, Any],
channel: AsynchronousSocketChannel
): ZIO[Clock with Console, Nothing, Unit] =
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.flattenChunks
.take(4)
.transduce(ZTransducer.utf8Decode)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- console.putStrLn(s"Read data: ${data.mkString}") *>
clock.sleep(3.seconds) *>
console.putStrLn("Done")
} yield ()
}