-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
75 lines (61 loc) · 2.48 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from austingames.threeupthreedown.communication import Communicator
from austingames.threeupthreedown.game import Game
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
game = Game()
@app.get("/", response_class=HTMLResponse)
async def get() -> str:
with open("threeupthreedown.html") as f:
return f.read()
@app.websocket("/ws/{player_name}")
async def websocket_endpoint(websocket: WebSocket, player_name: str) -> None:
await websocket.accept()
comms = Communicator(websocket)
try:
# handle a game already in play
if game.is_playing and player_name not in game.players:
await comms.update_prompt("A game is being played. Please try again later.")
return
elif game.is_playing:
game.players[player_name].comms = comms
await game.broadcast_board()
await game.broadcast_waiting_prompt()
while True:
# play as non-VIP
await asyncio.sleep(1000)
# game hasn't started yet; set it up
is_vip = not game.players
if is_vip:
await comms.update_prompt("Start the game when everyone has joined!")
await comms.enable_vip_form()
game.add_player(name=player_name, is_vip=is_vip, comms=comms)
current_players = "\n".join(
name + " (VIP)" if player.is_vip else name
for name, player in game.players.items()
)
for player in game.players.values():
await player.comms.update_board(
f"Waiting for VIP to start the game. Current players:\n\n{current_players}",
)
# wait for VIP to kick it off
if is_vip:
await websocket.receive_text()
await comms.update_prompt("")
win_msg = await game.play()
for player in game.players.values():
await player.comms.update_prompt(
win_msg + " Refresh page to start again :)"
)
game.reset_game()
else:
while True:
# play as non-VIP
await asyncio.sleep(1000)
except WebSocketDisconnect:
if not game.is_playing:
game.players.pop(player_name, None)
print(f"{player_name} disconnected, waiting for them to come back")