From 3ebd46941507e7113d23e943ee2a5c2685826701 Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Fri, 10 Apr 2020 17:25:26 +0200 Subject: [PATCH 1/6] simplified StreamsBasedServer code --- .../src/main/scala/StreamsBasedServer.scala | 73 ++++++++----------- 1 file changed, 31 insertions(+), 42 deletions(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index e3d76be2..9e811efc 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -1,54 +1,43 @@ -package zio.nio.examples - import zio._ +import zio.clock.Clock +import zio.console.Console import zio.duration._ import zio.nio.core.SocketAddress import zio.nio.channels._ import zio.stream._ object StreamsBasedServer extends App { - - def run(args: List[String]) = - ZStream - .managed(server(8080)) - .flatMap(handleConnections(_) { chunk => - console.putStrLn(s"Read data: ${chunk.mkString}") *> - clock.sleep(2.seconds) *> - console.putStrLn("Done") - }) - .runDrain + override def run(args: List[String]) = + server(30303, 5) .orDie .as(0) - def server(port: Int): Managed[Exception, AsynchronousServerSocketChannel] = - for { - server <- AsynchronousServerSocketChannel() - socketAddress <- SocketAddress.inetSocketAddress(port).toManaged_ - _ <- server.bind(socketAddress).toManaged_ - } yield server + def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] = + AsynchronousServerSocketChannel() + .use(socket => + for { + _ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind + _ <- ZStream + .repeatEffect(socket.accept.preallocate) + .map(_.withEarlyRelease) + .mapMPar(parallelism)(_.use((doWork _).tupled)) + .runDrain + } yield ()) - def handleConnections[R <: console.Console]( - server: AsynchronousServerSocketChannel - )(f: String => RIO[R, Unit]): ZStream[R, Throwable, Unit] = - ZStream - .repeatEffect(server.accept.preallocate) - .map(conn => ZStream.managed(conn.ensuring(console.putStrLn("Connection closed")).withEarlyRelease)) - .flatMapPar[R, Throwable, Unit](16) { connection => - connection - .mapM { - case (closeConn, channel) => - for { - _ <- console.putStrLn("Received connection") - data <- ZStream - .fromEffectOption( - channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) - ) - .take(4) - .transduce(ZSink.utf8DecodeChunk) - .run(Sink.foldLeft("")(_ + (_: String))) - _ <- closeConn - _ <- f(data) - } yield () - } - } + + def doWork(closeConn: URIO[Any, Any], channel: AsynchronousSocketChannel): ZIO[Clock with Console, Nothing, Unit] = + for { + _ <- console.putStrLn("Received connection") + data <- ZStream + .fromEffectOption( + channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) + ) + .take(4) + .transduce(ZSink.utf8DecodeChunk) + .run(Sink.foldLeft("")(_ + (_: String))) + _ <- closeConn + _ <- console.putStrLn(s"Read data: ${data.mkString}") *> + clock.sleep(3.seconds) *> + console.putStrLn("Done") + } yield () } From 3396ab5f2b03c26a53ada054c2cdf28af2b6b699 Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Fri, 10 Apr 2020 17:26:37 +0200 Subject: [PATCH 2/6] aded type annotation to run method --- examples/src/main/scala/StreamsBasedServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index 9e811efc..f7625110 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -7,7 +7,7 @@ import zio.nio.channels._ import zio.stream._ object StreamsBasedServer extends App { - override def run(args: List[String]) = + override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = server(30303, 5) .orDie .as(0) From 5164ab1969b0e3f7d1abc90c70ed61802951e3a2 Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Fri, 10 Apr 2020 17:38:08 +0200 Subject: [PATCH 3/6] fixed formatting --- .../src/main/scala/StreamsBasedServer.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index f7625110..9fef50f7 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -7,9 +7,9 @@ import zio.nio.channels._ import zio.stream._ object StreamsBasedServer extends App { + override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = - server(30303, 5) - .orDie + server(30303, 5).orDie .as(0) def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] = @@ -18,26 +18,26 @@ object StreamsBasedServer extends App { for { _ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind _ <- ZStream - .repeatEffect(socket.accept.preallocate) - .map(_.withEarlyRelease) - .mapMPar(parallelism)(_.use((doWork _).tupled)) - .runDrain - } yield ()) - + .repeatEffect(socket.accept.preallocate) + .map(_.withEarlyRelease) + .mapMPar(parallelism)(_.use((doWork _).tupled)) + .runDrain + } yield () + ) def doWork(closeConn: URIO[Any, Any], channel: AsynchronousSocketChannel): ZIO[Clock with Console, Nothing, Unit] = for { _ <- console.putStrLn("Received connection") data <- ZStream - .fromEffectOption( - channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) - ) - .take(4) - .transduce(ZSink.utf8DecodeChunk) - .run(Sink.foldLeft("")(_ + (_: String))) + .fromEffectOption( + channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) + ) + .take(4) + .transduce(ZSink.utf8DecodeChunk) + .run(Sink.foldLeft("")(_ + (_: String))) _ <- closeConn _ <- console.putStrLn(s"Read data: ${data.mkString}") *> - clock.sleep(3.seconds) *> - console.putStrLn("Done") + clock.sleep(3.seconds) *> + console.putStrLn("Done") } yield () } From c7f3e53445018757556c5cf585c0ed11d5587727 Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Fri, 10 Apr 2020 17:42:28 +0200 Subject: [PATCH 4/6] renamed work method --- examples/src/main/scala/StreamsBasedServer.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index 9fef50f7..a40723a0 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -20,12 +20,15 @@ object StreamsBasedServer extends App { _ <- ZStream .repeatEffect(socket.accept.preallocate) .map(_.withEarlyRelease) - .mapMPar(parallelism)(_.use((doWork _).tupled)) + .mapMPar(parallelism)(_.use((handleChannel _).tupled)) .runDrain } yield () ) - def doWork(closeConn: URIO[Any, Any], channel: AsynchronousSocketChannel): ZIO[Clock with Console, Nothing, Unit] = + def handleChannel( + closeConn: URIO[Any, Any], + channel: AsynchronousSocketChannel + ): ZIO[Clock with Console, Nothing, Unit] = for { _ <- console.putStrLn("Received connection") data <- ZStream From 580f03d8519e2c3063867a4d18a9c92557fb1826 Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Fri, 10 Apr 2020 17:44:51 +0200 Subject: [PATCH 5/6] changed port and parallelism to original values --- examples/src/main/scala/StreamsBasedServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index a40723a0..acf008f7 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -9,7 +9,7 @@ import zio.stream._ object StreamsBasedServer extends App { override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = - server(30303, 5).orDie + server(8080, 16).orDie .as(0) def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] = From 43279d31e6553ec7d7a42bc4d9a30b65e7213bbd Mon Sep 17 00:00:00 2001 From: roelvanderpaal Date: Wed, 21 Oct 2020 22:28:25 +0200 Subject: [PATCH 6/6] fixed compilation errors applied latest formatting --- .../src/main/scala/StreamsBasedServer.scala | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/examples/src/main/scala/StreamsBasedServer.scala b/examples/src/main/scala/StreamsBasedServer.scala index acf008f7..fae9b9ab 100644 --- a/examples/src/main/scala/StreamsBasedServer.scala +++ b/examples/src/main/scala/StreamsBasedServer.scala @@ -2,15 +2,15 @@ import zio._ import zio.clock.Clock import zio.console.Console import zio.duration._ -import zio.nio.core.SocketAddress import zio.nio.channels._ +import zio.nio.core.SocketAddress import zio.stream._ object StreamsBasedServer extends App { - override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = + override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = server(8080, 16).orDie - .as(0) + .as(ExitCode.success) def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] = AsynchronousServerSocketChannel() @@ -18,10 +18,10 @@ object StreamsBasedServer extends App { for { _ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind _ <- ZStream - .repeatEffect(socket.accept.preallocate) - .map(_.withEarlyRelease) - .mapMPar(parallelism)(_.use((handleChannel _).tupled)) - .runDrain + .repeatEffect(socket.accept.preallocate) + .map(_.withEarlyRelease) + .mapMPar(parallelism)(_.use((handleChannel _).tupled)) + .runDrain } yield () ) @@ -30,17 +30,18 @@ object StreamsBasedServer extends App { channel: AsynchronousSocketChannel ): ZIO[Clock with Console, Nothing, Unit] = for { - _ <- console.putStrLn("Received connection") + _ <- console.putStrLn("Received connection") data <- ZStream - .fromEffectOption( - channel.read(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) - ) - .take(4) - .transduce(ZSink.utf8DecodeChunk) - .run(Sink.foldLeft("")(_ + (_: String))) - _ <- closeConn - _ <- console.putStrLn(s"Read data: ${data.mkString}") *> - clock.sleep(3.seconds) *> - console.putStrLn("Done") + .fromEffectOption( + channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None)) + ) + .flattenChunks + .take(4) + .transduce(ZTransducer.utf8Decode) + .run(Sink.foldLeft("")(_ + (_: String))) + _ <- closeConn + _ <- console.putStrLn(s"Read data: ${data.mkString}") *> + clock.sleep(3.seconds) *> + console.putStrLn("Done") } yield () }