Skip to content

Commit

Permalink
Made chat app thread-safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
林志宇 committed May 9, 2019
1 parent 561205f commit 83d0477
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
/*-config.json
/apps/
/skywire/
/local/
/local*

pkg/node/apps/
pkg/node/bar/
Expand Down
45 changes: 25 additions & 20 deletions cmd/apps/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ import (
var addr = flag.String("addr", ":8000", "address to bind")

var (
chatApp *app.App
clientChan chan string
chatConns map[cipher.PubKey]net.Conn
connsMu sync.Mutex
chatApp *app.App
clientCh chan string
chatConns map[cipher.PubKey]net.Conn
connsMu sync.Mutex
)

func main() {
flag.Parse()

var err error
config := &app.Config{AppName: "chat", AppVersion: "1.0", ProtocolVersion: "0.0.1"}
chatApp, err = app.Setup(config)
a, err := app.Setup(&app.Config{AppName: "chat", AppVersion: "1.0", ProtocolVersion: "0.0.1"})
if err != nil {
log.Fatal("Setup failure: ", err)
}
defer chatApp.Close()
defer func() { _ = a.Close() }()

chatApp = a

clientCh = make(chan string)
defer close(clientCh)

chatConns = make(map[cipher.PubKey]net.Conn)
go listenLoop()
Expand Down Expand Up @@ -78,7 +81,7 @@ func handleConn(conn net.Conn) {

clientMsg, _ := json.Marshal(map[string]string{"sender": raddr.PubKey.Hex(), "message": string(buf[:n])}) // nolint
select {
case clientChan <- string(clientMsg):
case clientCh <- string(clientMsg):
default:
}
}
Expand Down Expand Up @@ -130,21 +133,23 @@ func sseHandler(w http.ResponseWriter, req *http.Request) {
return
}

clientChan = make(chan string)

go func() {
<-req.Context().Done()
close(clientChan)
log.Println("SSE connection were closed.")
}()

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Transfer-Encoding", "chunked")

for msg := range clientChan {
fmt.Fprintf(w, "data: %s\n\n", msg)
f.Flush()
for {
select {
case msg, ok := <-clientCh:
if !ok {
return
}
_, _ = fmt.Fprintf(w, "data: %s\n\n", msg)
f.Flush()

case <-req.Context().Done():
log.Println("SSE connection were closed.")
return
}
}
}
3 changes: 3 additions & 0 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (tm *Manager) CreateDefaultTransports(ctx context.Context) {

// Serve runs listening loop across all registered factories.
func (tm *Manager) Serve(ctx context.Context) error {
tm.ReconnectTransports(ctx)
tm.CreateDefaultTransports(ctx)

var wg sync.WaitGroup
for _, factory := range tm.factories {
wg.Add(1)
Expand Down

0 comments on commit 83d0477

Please sign in to comment.