Skip to content

Commit

Permalink
simplify StreamsBasedServer example (#179)
Browse files Browse the repository at this point in the history
* simplified StreamsBasedServer code

* aded type annotation to run method

* fixed formatting

* renamed work method

* changed port and parallelism to original values

* fixed compilation errors
applied latest formatting

Co-authored-by: przemyslaw wierzbicki <[email protected]>
  • Loading branch information
RoelVanderPaal and pshemass authored Oct 21, 2020
1 parent 147cb24 commit c6ce460
Showing 1 changed file with 37 additions and 44 deletions.
81 changes: 37 additions & 44 deletions examples/src/main/scala/StreamsBasedServer.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,47 @@
package zio.nio.examples

import zio._
import zio.clock.Clock
import zio.console.Console
import zio.duration._
import zio.nio.core.SocketAddress
import zio.nio.channels._
import zio.nio.core.SocketAddress
import zio.stream._

object StreamsBasedServer extends App {

def run(args: List[String]) =
ZStream
.managed(server(8080))
.flatMap(handleConnections(_) { chunk =>
console.putStrLn(s"Read data: ${chunk.mkString}") *>
clock.sleep(2.seconds) *>
console.putStrLn("Done")
})
.runDrain
.orDie
.exitCode
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
server(8080, 16).orDie
.as(ExitCode.success)

def server(port: Int): Managed[Exception, AsynchronousServerSocketChannel] =
for {
server <- AsynchronousServerSocketChannel()
socketAddress <- SocketAddress.inetSocketAddress(port).toManaged_
_ <- server.bind(socketAddress).toManaged_
} yield server
def server(port: Int, parallelism: Int): ZIO[ZEnv, Exception, Unit] =
AsynchronousServerSocketChannel()
.use(socket =>
for {
_ <- SocketAddress.inetSocketAddress("localhost", port) >>= socket.bind
_ <- ZStream
.repeatEffect(socket.accept.preallocate)
.map(_.withEarlyRelease)
.mapMPar(parallelism)(_.use((handleChannel _).tupled))
.runDrain
} yield ()
)

def handleConnections[R <: console.Console](
server: AsynchronousServerSocketChannel
)(f: String => RIO[R, Unit]): ZStream[R, Throwable, Unit] =
ZStream
.repeatEffect(server.accept.preallocate)
.map(conn => ZStream.managed(conn.ensuring(console.putStrLn("Connection closed")).withEarlyRelease))
.flatMapPar[R, Throwable, Unit](16) { connection =>
connection
.mapM { case (closeConn, channel) =>
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.flattenChunks
.take(4)
.transduce(ZTransducer.utf8Decode)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- f(data)
} yield ()
}
}
def handleChannel(
closeConn: URIO[Any, Any],
channel: AsynchronousSocketChannel
): ZIO[Clock with Console, Nothing, Unit] =
for {
_ <- console.putStrLn("Received connection")
data <- ZStream
.fromEffectOption(
channel.readChunk(64).tap(_ => console.putStrLn("Read chunk")).orElse(ZIO.fail(None))
)
.flattenChunks
.take(4)
.transduce(ZTransducer.utf8Decode)
.run(Sink.foldLeft("")(_ + (_: String)))
_ <- closeConn
_ <- console.putStrLn(s"Read data: ${data.mkString}") *>
clock.sleep(3.seconds) *>
console.putStrLn("Done")
} yield ()
}

0 comments on commit c6ce460

Please sign in to comment.