diff --git a/pkg/app2/client_conn.go b/pkg/app2/client_conn.go new file mode 100644 index 000000000..5617d3070 --- /dev/null +++ b/pkg/app2/client_conn.go @@ -0,0 +1,23 @@ +package app2 + +import ( + "net" + + "github.com/skycoin/skywire/pkg/routing" +) + +// clientConn serves as a wrapper for `net.Conn` being returned to the +// app client side from `Accept` func +type clientConn struct { + remote routing.Addr + local routing.Addr + net.Conn +} + +func (c *clientConn) RemoteAddr() net.Addr { + return c.remote +} + +func (c *clientConn) LocalAddr() net.Addr { + return c.local +} diff --git a/pkg/app2/listener.go b/pkg/app2/listener.go index 98f997411..e062138c1 100644 --- a/pkg/app2/listener.go +++ b/pkg/app2/listener.go @@ -5,7 +5,6 @@ import ( "net" "github.com/skycoin/skycoin/src/util/logging" - "github.com/skycoin/skywire/pkg/routing" ) @@ -17,18 +16,9 @@ var ( ErrListenerClosed = errors.New("listener closed") ) -type acceptedConn struct { - remote routing.Addr - net.Conn -} - -func (c *acceptedConn) Addr() net.Addr { - return c.remote -} - type Listener struct { addr routing.Addr - conns chan *acceptedConn + conns chan *clientConn stopListening func(port routing.Port) error logger *logging.Logger lm *listenersManager @@ -39,7 +29,7 @@ func NewListener(addr routing.Addr, lm *listenersManager, procID ProcID, stopListening func(port routing.Port) error, l *logging.Logger) *Listener { return &Listener{ addr: addr, - conns: make(chan *acceptedConn, listenerBufSize), + conns: make(chan *clientConn, listenerBufSize), lm: lm, stopListening: stopListening, logger: l, @@ -83,6 +73,6 @@ func (l *Listener) Addr() net.Addr { return l.addr } -func (l *Listener) addConn(conn *acceptedConn) { +func (l *Listener) addConn(conn *clientConn) { l.conns <- conn } diff --git a/pkg/app2/listeners_manager.go b/pkg/app2/listeners_manager.go index a1c992535..1c1b7a7da 100644 --- a/pkg/app2/listeners_manager.go +++ b/pkg/app2/listeners_manager.go @@ -84,7 +84,7 @@ func (lm *listenersManager) addConn(localPort routing.Port, remote routing.Addr, lm.mx.RUnlock() return ErrNoListenerOnPort } - lm.listeners[localPort].addConn(&acceptedConn{ + lm.listeners[localPort].addConn(&clientConn{ remote: remote, Conn: conn, }) diff --git a/pkg/app2/server.go b/pkg/app2/server.go index 6b46889ed..3953266c0 100644 --- a/pkg/app2/server.go +++ b/pkg/app2/server.go @@ -21,7 +21,7 @@ import ( "github.com/skycoin/dmsg/cipher" ) -type clientConn struct { +type serverConn struct { procID ProcID conn net.Conn session *yamux.Session @@ -34,7 +34,7 @@ type clientConn struct { type Server struct { PK cipher.PubKey dmsgC *dmsg.Client - apps map[string]*clientConn + apps map[string]*serverConn appsMx sync.RWMutex logger *logging.Logger } @@ -43,7 +43,7 @@ func NewServer(localPK cipher.PubKey, dmsgC *dmsg.Client, l *logging.Logger) *Se return &Server{ PK: localPK, dmsgC: dmsgC, - apps: make(map[string]*clientConn), + apps: make(map[string]*serverConn), logger: l, } } @@ -70,7 +70,7 @@ func (s *Server) Serve(sockAddr string) error { return errors.Wrap(err, "error creating yamux session") } - s.apps[conn.RemoteAddr().String()] = &clientConn{ + s.apps[conn.RemoteAddr().String()] = &serverConn{ session: session, conn: conn, lm: newListenersManager(), @@ -83,7 +83,7 @@ func (s *Server) Serve(sockAddr string) error { } } -func (s *Server) serveClient(conn *clientConn) error { +func (s *Server) serveClient(conn *serverConn) error { for { stream, err := conn.session.Accept() if err != nil { @@ -116,7 +116,7 @@ func (s *Server) serveClient(conn *clientConn) error { } } -func (s *Server) serveStream(stream net.Conn, conn *clientConn) error { +func (s *Server) serveStream(stream net.Conn, conn *serverConn) error { for { hsFrame, err := readHSFrame(stream) if err != nil { @@ -209,7 +209,7 @@ func (s *Server) forwardOverDMSG(stream net.Conn, tp *dmsg.Transport) error { return nil } -func (c *clientConn) reserveListener(port routing.Port) error { +func (c *serverConn) reserveListener(port routing.Port) error { c.dmsgListenersMx.Lock() if _, ok := c.dmsgListeners[port]; ok { c.dmsgListenersMx.Unlock() @@ -220,7 +220,7 @@ func (c *clientConn) reserveListener(port routing.Port) error { return nil } -func (c *clientConn) addListener(port routing.Port, l *dmsg.Listener) error { +func (c *serverConn) addListener(port routing.Port, l *dmsg.Listener) error { c.dmsgListenersMx.Lock() if lis, ok := c.dmsgListeners[port]; ok && lis != nil { c.dmsgListenersMx.Unlock() @@ -232,7 +232,7 @@ func (c *clientConn) addListener(port routing.Port, l *dmsg.Listener) error { return nil } -func (c *clientConn) acceptDMSG(l *dmsg.Listener) error { +func (c *serverConn) acceptDMSG(l *dmsg.Listener) error { for { stream, err := c.session.Open() if err != nil {