Skip to content

Commit

Permalink
Merge remote-tracking branch 'dark/feature/skymsg-tests' into bug/fix…
Browse files Browse the repository at this point in the history
…-client-close-hangs
  • Loading branch information
林志宇 committed Jun 15, 2019
2 parents 06e061e + 44eb05f commit b648d5d
Show file tree
Hide file tree
Showing 9 changed files with 1,425 additions and 24 deletions.
14 changes: 12 additions & 2 deletions cmd/messaging-server/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"log"
"log/syslog"
"net"
"net/http"
"os"

Expand Down Expand Up @@ -71,9 +72,18 @@ var rootCmd = &cobra.Command{
}
}()

l, err := net.Listen("tcp", conf.LocalAddress)
if err != nil {
logger.Fatalf("Error listening on %s: %v", conf.LocalAddress, err)
}

// Start
srv := dmsg.NewServer(conf.PubKey, conf.SecKey, conf.PublicAddress, client.NewHTTP(conf.Discovery))
log.Fatal(srv.ListenAndServe(conf.LocalAddress))
srv, err := dmsg.NewServer(conf.PubKey, conf.SecKey, l, client.NewHTTP(conf.Discovery))
if err != nil {
logger.Fatalf("Error creating DMSG server instance: %v", err)
}

log.Fatal(srv.Serve())
},
}

Expand Down
50 changes: 34 additions & 16 deletions pkg/dmsg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/skycoin/skywire/pkg/messaging-discovery/client"
)

var ErrListenerAlreadyWrappedToNoise = errors.New("listener is already wrapped to *noise.Listener")

// NextConn provides information on the next connection.
type NextConn struct {
l *ServerConn
Expand Down Expand Up @@ -207,11 +209,11 @@ func (c *ServerConn) handleRequest(ctx context.Context, getLink getConnFunc, id
type Server struct {
log *logging.Logger

pk cipher.PubKey
sk cipher.SecKey
addr string
dc client.APIClient
pk cipher.PubKey
sk cipher.SecKey
dc client.APIClient

addr string
lis net.Listener
conns map[cipher.PubKey]*ServerConn
mx sync.RWMutex
Expand All @@ -220,22 +222,34 @@ type Server struct {
}

// NewServer creates a new dms_server.
func NewServer(pk cipher.PubKey, sk cipher.SecKey, addr string, dc client.APIClient) *Server {
func NewServer(pk cipher.PubKey, sk cipher.SecKey, l net.Listener, dc client.APIClient) (*Server, error) {
addr := l.Addr().String()

if _, ok := l.(*noise.Listener); ok {
return nil, ErrListenerAlreadyWrappedToNoise
}

return &Server{
log: logging.MustGetLogger("dms_server"),
pk: pk,
sk: sk,
addr: addr,
lis: noise.WrapListener(l, pk, sk, false, noise.HandshakeXK),
dc: dc,
conns: make(map[cipher.PubKey]*ServerConn),
}
}, nil
}

// SetLogger set's the logger.
func (s *Server) SetLogger(log *logging.Logger) {
s.log = log
}

// Addr returns the server's listening address.
func (s *Server) Addr() string {
return s.addr
}

func (s *Server) setConn(l *ServerConn) {
s.mx.Lock()
s.conns[l.remoteClient] = l
Expand All @@ -255,6 +269,13 @@ func (s *Server) getConn(pk cipher.PubKey) (*ServerConn, bool) {
return l, ok
}

func (s *Server) connsCount() int {
s.mx.RLock()
count := len(s.conns)
s.mx.RUnlock()
return count
}

// Close closes the dms_server.
func (s *Server) Close() (err error) {
defer func() {
Expand All @@ -275,23 +296,18 @@ func (s *Server) Close() (err error) {
}

// ListenAndServe serves the dms_server.
func (s *Server) ListenAndServe(addr string) error {
func (s *Server) Serve() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lis, err := net.Listen("tcp", addr)
if err != nil {
return err
}
if err := s.retryUpdateEntry(ctx, hsTimeout); err != nil {
return fmt.Errorf("updating server's discovery entry failed with: %s", err)
}

s.log.Infof("serving: pk(%s) addr(%s)", s.pk, lis.Addr())
lis = noise.WrapListener(lis, s.pk, s.sk, false, noise.HandshakeXK)
s.lis = lis
s.log.Infof("serving: pk(%s) addr(%s)", s.pk, s.addr)

for {
rawConn, err := lis.Accept()
rawConn, err := s.lis.Accept()
if err != nil {
if err == io.ErrUnexpectedEOF {
continue
Expand Down Expand Up @@ -321,8 +337,10 @@ func (s *Server) updateDiscEntry(ctx context.Context) error {
}
return s.dc.SetEntry(ctx, entry)
}
entry.Server.Address = s.addr

entry.Server.Address = s.Addr()
s.log.Infoln("updatingEntry:", entry)

return s.dc.UpdateEntry(ctx, s.sk, entry)
}

Expand Down
Loading

0 comments on commit b648d5d

Please sign in to comment.