Skip to content

Commit

Permalink
Merge pull request #3 from vaslabs/feature/vectorClocks
Browse files Browse the repository at this point in the history
Server side vector clock support
  • Loading branch information
vaslabs authored Aug 7, 2020
2 parents dda78b7 + 89cf222 commit 721b579
Show file tree
Hide file tree
Showing 16 changed files with 310 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ lazy val endpoints =

lazy val engine =
(project in file("engine"))
.settings(libraryDependencies ++= Dependencies.Modules.protocol)
.settings(libraryDependencies ++= Dependencies.Modules.engine)
.settings(noPublishSettings)
.settings(compilerSettings)
.dependsOn(model)
Expand Down
12 changes: 7 additions & 5 deletions endpoints/src/main/scala/cardgame/endpoints/Actions.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package cardgame.endpoints

import cardgame.model.{Event, GameId, PlayingGameAction}
import sttp.tapir._
import cardgame.json.circe._
import cardgame.model.{ClockedAction, ClockedResponse, GameId}
import sttp.model.StatusCode
import sttp.tapir._
import sttp.tapir.json.circe._
object Actions {

import cardgame.endpoints.codecs.ids._
import schema.java_types._
import schema.vector_clock._



val player =
endpoint.in("action" / path[GameId])
.in(jsonBody[PlayingGameAction])
.out(jsonBody[Event])
.in(jsonBody[ClockedAction])
.out(jsonBody[ClockedResponse])
.post
.errorOut(statusCode(StatusCode.NotFound))

Expand Down
6 changes: 3 additions & 3 deletions endpoints/src/main/scala/cardgame/endpoints/JoiningGame.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cardgame.endpoints

import cardgame.model.{Event, GameId, PlayerId}
import cardgame.model.{ClockedResponse, GameId, PlayerId}
import sttp.model.StatusCode
import sttp.tapir._
import cardgame.json.circe._
Expand All @@ -9,13 +9,13 @@ import sttp.tapir.json.circe._
object JoiningGame {

import codecs.ids._
import schema.java_types._
import schema.vector_clock._

val joinPlayer =
endpoint.in(
"game" / path[GameId] / "join"
).post
.in(query[PlayerId]("username"))
.out(jsonBody[Event])
.out(jsonBody[ClockedResponse])
.errorOut(statusCode(StatusCode.NotFound))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cardgame.endpoints.schema

import cardgame.endpoints.schema
import cardgame.model.{ClockedAction, ClockedResponse, Event, PlayingGameAction}
import sttp.tapir.SchemaType.SObjectInfo
import sttp.tapir.{Schema, SchemaType, Validator}

object vector_clock {
import schema.java_types._

implicit val actionSchema: Schema[PlayingGameAction] = Schema.derivedSchema[PlayingGameAction]
implicit val eventSchema: Schema[Event] = Schema.derivedSchema[Event]
implicit val mapSchema: Schema[Map[String, Long]] = Schema.schemaForMap

implicit val clockActionSchema: Schema[ClockedAction] = Schema(SchemaType.SCoproduct(
SObjectInfo("Clocked Action"), List(mapSchema, actionSchema), None
))

implicit val clockResponseSchema: Schema[ClockedResponse] = Schema(SchemaType.SCoproduct(
SObjectInfo("Clocked Response"), List(mapSchema, eventSchema), None
))

implicit val clockedResponseValidation: Validator[ClockedResponse] = Validator.pass

}
45 changes: 43 additions & 2 deletions endpoints/src/main/scala/cardgame/json/circe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package cardgame.json

import java.net.URI

import cardgame.model.{CardId, DeckId, Event, Game, HiddenCard, PlayerId, PlayingGameAction}
import io.circe.{Codec, Decoder, Encoder, Json}
import cardgame.model.{CardId, ClockedAction, ClockedResponse, DeckId, Event, Game, HiddenCard, PlayerId, PlayingGameAction}
import io.circe.{Codec, Decoder, Encoder, Json, KeyDecoder, KeyEncoder}
import io.circe.generic.auto._
import io.circe.generic.semiauto._
import io.circe.syntax._
import cats.implicits._

import scala.util.Try
object circe {
Expand All @@ -28,6 +29,9 @@ object circe {
implicit val gameEncoder: Encoder[Game] = deriveEncoder
implicit val gameDecoder: Decoder[Game] = deriveDecoder

implicit val playerIdKeyEncoder: KeyEncoder[PlayerId] = KeyEncoder.encodeKeyString.contramap(_.value)
implicit val playerIdKeyDecoder: KeyDecoder[PlayerId] = KeyDecoder.decodeKeyString.map(PlayerId)

implicit val hiddenCardEncoder: Encoder[HiddenCard] =
Encoder.instance {
hc =>
Expand All @@ -36,4 +40,41 @@ object circe {

implicit val playingGameActionCodec: Codec[PlayingGameAction] =
deriveCodec

implicit val clockedResponseEncoder: Encoder[ClockedResponse] = Encoder.instance {
clockedResponse =>
clockedResponse.event.asJson.mapObject(_.add("vectorClock", clockedResponse.clock.asJson)
.add("serverClock", clockedResponse.serverClock.asJson)
)
}

implicit val clockedResponseDecoder: Decoder[ClockedResponse] = Decoder.instance {
hcursor =>
(
hcursor.downField("vectorClock").as[Map[String, Long]].orElse(Right(Map.empty[String, Long])),
hcursor.downField("serverClock").as[Long],
hcursor.as[Event]
).mapN((vectorClocks, serverClock, events) =>
ClockedResponse(events, vectorClocks, serverClock)
)
}

implicit val clockedActionEncoder: Encoder[ClockedAction] = Encoder.instance {
clockedAction =>
clockedAction.action.asJson.mapObject(_.add("vectorClock", clockedAction.vectorClock.asJson)
.add("serverClock", clockedAction.serverClock.asJson)
)

}

implicit val clockedActionDecoder: Decoder[ClockedAction] = Decoder.instance {
hcursor =>
(
hcursor.downField("vectorClock").as[Map[String, Long]].orElse(Right(Map.empty[String, Long])),
hcursor.downField("serverClock").as[Long].orElse(Right(0L)),
hcursor.as[PlayingGameAction]
).mapN(
(clock, serverClock, action) => ClockedAction(action, clock, serverClock)
)
}
}
23 changes: 16 additions & 7 deletions engine/src/main/scala/cardgame/engine/GameOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,13 @@ object GameOps {
}


def action(gameAction: Action, randomizer: IO[Int]): (Game, Event) = {
def action(
gameAction: Action,
randomizer: IO[Int],
isIdempotent: PlayerId => (RemoteClock, RemoteClock) => Boolean)(
oldClock: RemoteClock,
newClock: RemoteClock
): (Game, Event) = {
gameAction match {
case jg: JoinGame =>
join(jg.player)
Expand All @@ -48,12 +54,15 @@ object GameOps {
case StartGame(deck) =>
start(deck, randomizer)
case p: PlayingGameAction =>
game match {
case sg: StartedGame =>
sg.playingAction(p, randomizer)
case _ =>
game -> InvalidAction(p.player)
}
if (isIdempotent(p.player)(oldClock, newClock)) {
game match {
case sg: StartedGame =>
sg.playingAction(p, randomizer)
case _ =>
game -> InvalidAction(p.player)
}
} else
game -> OutOfSync(p.player)
case _ =>
game -> InvalidAction()
}
Expand Down
4 changes: 2 additions & 2 deletions engine/src/main/scala/cardgame/engine/GameState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ case class GameState(gameProgress: LazyList[Action], game: Game, randomizer: IO[
case _: FinishedGame =>
None
case playableGame =>
val (game, event) = playableGame.action(action, randomizer)
val (game, event) = playableGame.action(action, randomizer, _ => (_,_) => true)(RemoteClock.zero, RemoteClock.zero)
Some(event -> GameState(gameProgress.drop(1), game, randomizer))
}


def start =
def start: Seq[Event] =
LazyList.unfold[Event, GameState](this) {
gameState =>
gameState.gameProgress.headOption.flatMap { nextCommand =>
Expand Down
27 changes: 27 additions & 0 deletions model/src/main/scala/cardgame/model/Model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ sealed trait PlayingGameAction extends Action {
def player: PlayerId
}

case class ClockedAction(action: PlayingGameAction, vectorClock: Map[String, Long], serverClock: Long)

sealed trait MustHaveTurnAction extends PlayingGameAction
sealed trait FreeAction extends PlayingGameAction

Expand Down Expand Up @@ -169,6 +171,7 @@ case class NewDirection(direction: Direction) extends Event
case class PlayerLeft(player: PlayerId, nextCurrentPlayer: Int) extends Event
case class CardRecovered(player: PlayerId, card: Card) extends Event
case class InvalidAction(playerId: Option[PlayerId]) extends Event
case class OutOfSync(playerId: PlayerId) extends Event
case class DiceThrow(playerId: PlayerId, dice: List[Die]) extends Event
case class ShuffledHand(playerId: PlayerId, hand: List[Card]) extends Event

Expand All @@ -183,3 +186,27 @@ sealed trait GameCompleted extends Event

case class GameStopped() extends GameCompleted
case class GameFinished(winner: PlayerId) extends GameCompleted

case class ClockedResponse private (event: Event, clock: Map[String, Long], serverClock: Long)

object ClockedResponse {
def apply(event: Event, remoteClock: RemoteClock, serverClock: Long): ClockedResponse =
ClockedResponse(event, remoteClock.showMap, serverClock)
}

case class RemoteClock(vectorClock: Map[PlayerId, Long]) {
private[model] def showMap: Map[String, Long] = vectorClock.map {
case (key, value) => key.value -> value
}
}

object RemoteClock {

def zero = RemoteClock(Map.empty)
def of(clocks: Map[String, Long]) = RemoteClock(
clocks.map {
case (key, value) => PlayerId(key) -> value
}
)

}
28 changes: 15 additions & 13 deletions processor/src/main/scala/cardgame/processor/ActiveGames.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.UUID

import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior, Scheduler}
import cardgame.model.{DeckId, Event, Game, GameId, JoinGame, JoiningPlayer, PlayerId, PlayingGameAction, StartingGame}
import cardgame.model.{ClockedResponse, DeckId, Game, GameId, JoinGame, JoiningPlayer, PlayerId, PlayingGameAction, RemoteClock, StartingGame}
import cats.effect.IO
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
Expand All @@ -21,7 +21,7 @@ object ActiveGames {
val randomizer = new Random(gameId.getMostSignificantBits)
val intRandomizer = IO.delay(randomizer.nextInt())
ctx.spawn(
GameProcessor.behavior(StartingGame(List.empty), intRandomizer),
GameProcessor.behavior(StartingGame(List.empty), intRandomizer, 0L, RemoteClock(Map.empty)),
gameId.toString
)
GameId(gameId)
Expand All @@ -34,9 +34,9 @@ object ActiveGames {
.map(_ ! GameProcessor.Get(playerId, replyTo))
.getOrElse(replyTo ! Left(()))
Behaviors.same
case (ctx, JoinExistingGame(gameId, playerId, replyTo)) =>
case (ctx, JoinExistingGame(gameId, playerId, remoteClock, replyTo)) =>
gameProcessor(ctx, gameId).map(
_ ! GameProcessor.RunCommand(replyTo, JoinGame(JoiningPlayer(playerId)))
_ ! GameProcessor.RunCommand(replyTo, JoinGame(JoiningPlayer(playerId)), remoteClock)
).getOrElse(replyTo ! Left(()))
Behaviors.same
case (ctx, LoadGame(token, gameId, deckId, server, replyTo)) =>
Expand All @@ -56,9 +56,9 @@ object ActiveGames {
)
Behaviors.same

case (ctx, DoGameAction(gameId, action, replyTo)) =>
case (ctx, DoGameAction(gameId, action, remoteClock, replyTo)) =>
gameProcessor(ctx, gameId).map(
_ ! GameProcessor.RunCommand(replyTo, action)
_ ! GameProcessor.RunCommand(replyTo, action, remoteClock)
).getOrElse(replyTo ! Left(()))
Behaviors.same
}
Expand Down Expand Up @@ -86,7 +86,8 @@ object ActiveGames {
case class JoinExistingGame(
gameId: GameId,
playerId: PlayerId,
replyTo: ActorRef[Either[Unit, Event]]
remoteClock: RemoteClock,
replyTo: ActorRef[Either[Unit, ClockedResponse]]
) extends Protocol

case class LoadGame(
Expand All @@ -100,7 +101,8 @@ object ActiveGames {
case class DoGameAction(
id: GameId,
action: PlayingGameAction,
replyTo: ActorRef[Either[Unit, Event]]
remoteClock: RemoteClock,
replyTo: ActorRef[Either[Unit, ClockedResponse]]
) extends Protocol


Expand All @@ -109,7 +111,7 @@ object ActiveGames {
implicit final class ActiveGamesOps(actorRef: ActorRef[Protocol]) {
type CreateGameRes = Either[Unit, GameId]
type GetGameRes = Either[Unit, Game]
type ActionRes = Either[Unit, Event]
type ActionRes = Either[Unit, ClockedResponse]

def createGame(authToken: String)(implicit
timeout: Timeout, scheduler: Scheduler): Future[CreateGameRes] =
Expand All @@ -119,17 +121,17 @@ object ActiveGames {
timeout: Timeout, scheduler: Scheduler): Future[GetGameRes] =
actorRef ? (GetGameStatus(gameId, playerId, _))

def joinGame(gameId: GameId, playerId: PlayerId)(implicit
def joinGame(gameId: GameId, playerId: PlayerId, remoteClock: RemoteClock)(implicit
timeout: Timeout, scheduler: Scheduler): Future[ActionRes] =
actorRef ? (JoinExistingGame(gameId, playerId, _))
actorRef ? (JoinExistingGame(gameId, playerId, remoteClock, _))

def startGame(token: String, gameId: GameId, deckId: DeckId, server: String)(implicit
timeout: Timeout, scheduler: Scheduler): Future[Either[Unit, Unit]] =
actorRef ? (LoadGame(token, gameId, deckId, server, _))

def action(gameId: GameId, action: PlayingGameAction)
def action(gameId: GameId, action: PlayingGameAction, remoteClock: RemoteClock)
(implicit timeout: Timeout, scheduler: Scheduler): Future[ActionRes] = {
actorRef ? (DoGameAction(gameId, action, _))
actorRef ? (DoGameAction(gameId, action, remoteClock, _))
}
}

Expand Down
4 changes: 2 additions & 2 deletions processor/src/main/scala/cardgame/processor/GameLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import java.util.UUID

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import cardgame.model.{CardId, Deck, DeckId, HiddenCard, StartGame, StartingRules}
import cardgame.model.{CardId, Deck, DeckId, HiddenCard, RemoteClock, StartGame, StartingRules}
import cardgame.processor.GameProcessor.FireAndForgetCommand
import cats.effect.{IO, Resource}

Expand All @@ -26,7 +26,7 @@ object GameLoader {
ctx.self ! DeckReady(loadDeck(deckId, server))
Behaviors.receiveMessage {
case DeckReady(deck) =>
game ! FireAndForgetCommand(StartGame(deck))
game ! FireAndForgetCommand(StartGame(deck), RemoteClock.zero)
replyTo ! Right(())
Behaviors.stopped
}
Expand Down
Loading

0 comments on commit 721b579

Please sign in to comment.