From f5986d5514798a9b26ccd681001f3a50f548b9fb Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Fri, 20 Sep 2019 17:26:09 +0300 Subject: [PATCH] Add networker stuff --- go.mod | 2 + go.sum | 10 + pkg/app2/client.go | 1 + pkg/app2/network/addr.go | 28 ++ pkg/app2/network/networker.go | 84 +++++ pkg/app2/network/type.go | 21 ++ vendor/github.com/skycoin/dmsg/client.go | 340 ++---------------- vendor/github.com/skycoin/dmsg/client_conn.go | 295 +++++++++++++++ vendor/github.com/skycoin/dmsg/listener.go | 124 +++++++ .../github.com/skycoin/dmsg/netutil/porter.go | 102 ++++++ .../github.com/skycoin/dmsg/port_manager.go | 66 ++++ vendor/github.com/skycoin/dmsg/server.go | 232 ------------ vendor/github.com/skycoin/dmsg/server_conn.go | 243 +++++++++++++ vendor/github.com/skycoin/dmsg/testing.go | 11 +- vendor/github.com/skycoin/dmsg/transport.go | 57 +-- .../skycoin/dmsg/{frame.go => types.go} | 79 +++- vendor/modules.txt | 3 +- 17 files changed, 1122 insertions(+), 576 deletions(-) create mode 100644 pkg/app2/network/addr.go create mode 100644 pkg/app2/network/networker.go create mode 100644 pkg/app2/network/type.go create mode 100644 vendor/github.com/skycoin/dmsg/client_conn.go create mode 100644 vendor/github.com/skycoin/dmsg/listener.go create mode 100644 vendor/github.com/skycoin/dmsg/netutil/porter.go create mode 100644 vendor/github.com/skycoin/dmsg/port_manager.go create mode 100644 vendor/github.com/skycoin/dmsg/server_conn.go rename vendor/github.com/skycoin/dmsg/{frame.go => types.go} (66%) diff --git a/go.mod b/go.mod index e7947d1c40..c0cfa4c314 100644 --- a/go.mod +++ b/go.mod @@ -26,3 +26,5 @@ require ( golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 golang.org/x/net v0.0.0-20190916140828-c8589233b77d ) + +replace github.com/skycoin/dmsg => ../dmsg diff --git a/go.sum b/go.sum index 13c080a752..b9ca83793d 100644 --- a/go.sum +++ b/go.sum @@ -58,10 +58,13 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxv github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -101,14 +104,18 @@ github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURm github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f h1:WWjaxOXoj6oYelm67MNtJbg51HQALjKAyhs2WAHgpZs= github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f/go.mod h1:obZYZp8eKR7Xqz+KNhJdUE6Gvp6rEXbDO8YTlW2YXgU= +github.com/skycoin/skycoin v0.25.1/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= github.com/skycoin/skycoin v0.26.0 h1:xDxe2r8AclMntZ550Y/vUQgwgLtwrf9Wu5UYiYcN5/o= github.com/skycoin/skycoin v0.26.0/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw= +github.com/skycoin/skywire v0.1.1/go.mod h1:jDuUgTG20jhiBI6Trpayj0my6xhdS+ejEO9gTSM+C/E= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.5 h1:f0B+LkLX6DtmRH1isoNA9VTtNUK9K8xYd28JNNfOv/s= github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= @@ -124,16 +131,19 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 h1:0hQKqeLdqlt5iIwVOBErRisrHJAN57yOiPRQItI20fU= golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190313220215-9f648a60d977/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= diff --git a/pkg/app2/client.go b/pkg/app2/client.go index 3d302c3024..53fd6b0088 100644 --- a/pkg/app2/client.go +++ b/pkg/app2/client.go @@ -37,6 +37,7 @@ func (c *Client) Dial(remote routing.Addr) (net.Conn, error) { if err != nil { return nil, err } + net.Dial() connID, err := c.rpc.Dial(remote) if err != nil { diff --git a/pkg/app2/network/addr.go b/pkg/app2/network/addr.go new file mode 100644 index 0000000000..36fec8306d --- /dev/null +++ b/pkg/app2/network/addr.go @@ -0,0 +1,28 @@ +package network + +import ( + "fmt" + + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/skywire/pkg/routing" +) + +// Addr implements net.Addr for network addresses. +type Addr struct { + Net Type + PubKey cipher.PubKey + Port routing.Port +} + +// Network returns "dmsg" +func (a Addr) Network() string { + return string(a.Net) +} + +// String returns public key and port of node split by colon. +func (a Addr) String() string { + if a.Port == 0 { + return fmt.Sprintf("%s:~", a.PubKey) + } + return fmt.Sprintf("%s:%d", a.PubKey, a.Port) +} diff --git a/pkg/app2/network/networker.go b/pkg/app2/network/networker.go new file mode 100644 index 0000000000..34b320f4b7 --- /dev/null +++ b/pkg/app2/network/networker.go @@ -0,0 +1,84 @@ +package network + +import ( + "context" + "errors" + "net" + "sync" +) + +var ( + // ErrNoSuchNetworker is being returned when there's no suitable networker. + ErrNoSuchNetworker = errors.New("no such networker") + // ErrNetworkerAlreadyExists is being returned when there's already one with such Network type. + ErrNetworkerAlreadyExists = errors.New("networker already exists") +) + +var ( + networkers = map[Type]Networker{} + networkersMx sync.RWMutex +) + +// AddNetworker associated Networker with the `network`. +func AddNetworker(t Type, n Networker) error { + networkersMx.Lock() + defer networkersMx.Unlock() + + if _, ok := networkers[t]; ok { + return ErrNetworkerAlreadyExists + } + + networkers[t] = n + + return nil +} + +// ResolveNetworker resolves Networker by `network`. +func ResolveNetworker(t Type) (Networker, error) { + networkersMx.RLock() + n, ok := networkers[t] + if !ok { + networkersMx.RUnlock() + return nil, ErrNoSuchNetworker + } + networkersMx.RUnlock() + return n, nil +} + +// Networker defines basic network operations, such as Dial/Listen. +type Networker interface { + Dial(addr Addr) (net.Conn, error) + DialContext(ctx context.Context, addr Addr) (net.Conn, error) + Listen(addr Addr) (net.Listener, error) + ListenContext(ctx context.Context, addr Addr) (net.Listener, error) +} + +// Dial dials the remote `addr` of the specified `network`. +func Dial(t Type, addr Addr) (net.Conn, error) { + return DialContext(context.Background(), t, addr) +} + +// DialContext dials the remote `Addr` of the specified `network` with the context. +func DialContext(ctx context.Context, t Type, addr Addr) (net.Conn, error) { + n, err := ResolveNetworker(t) + if err != nil { + return nil, err + } + + return n.DialContext(ctx, addr) +} + +// Listen starts listening on the local `addr` of the specified `network`. +func Listen(t Type, addr Addr) (net.Listener, error) { + return ListenContext(context.Background(), t, addr) +} + +// ListenContext starts listening on the local `addr` of the specified `network` with the context. +func ListenContext(ctx context.Context, t Type, addr Addr) (net.Listener, error) { + networker, err := ResolveNetworker(t) + if err != nil { + return nil, err + } + + return networker.ListenContext(ctx, addr) +} diff --git a/pkg/app2/network/type.go b/pkg/app2/network/type.go new file mode 100644 index 0000000000..d91c6a0dcc --- /dev/null +++ b/pkg/app2/network/type.go @@ -0,0 +1,21 @@ +package network + +// Type represents the network type. +type Type string + +const ( + // TypeDMSG is a network type for DMSG communication. + TypeDMSG = "dmsg" +) + +// IsValid checks whether the network contains valid value for the type. +func (n Type) IsValid() bool { + _, ok := validNetworks[n] + return ok +} + +var ( + validNetworks = map[Type]struct{}{ + TypeDMSG: {}, + } +) diff --git a/vendor/github.com/skycoin/dmsg/client.go b/vendor/github.com/skycoin/dmsg/client.go index 8e6a20fede..f09ffefde9 100644 --- a/vendor/github.com/skycoin/dmsg/client.go +++ b/vendor/github.com/skycoin/dmsg/client.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/sirupsen/logrus" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/dmsg/cipher" @@ -31,270 +30,6 @@ var ( ErrClientAcceptMaxed = errors.New("client accepts buffer maxed") ) -// ClientConn represents a connection between a dmsg.Client and dmsg.Server from a client's perspective. -type ClientConn struct { - log *logging.Logger - - net.Conn // conn to dmsg server - local cipher.PubKey // local client's pk - remoteSrv cipher.PubKey // dmsg server's public key - - // nextInitID keeps track of unused tp_ids to assign a future locally-initiated tp. - // locally-initiated tps use an even tp_id between local and intermediary dms_server. - nextInitID uint16 - - // Transports: map of transports to remote dms_clients (key: tp_id, val: transport). - tps map[uint16]*Transport - mx sync.RWMutex // to protect tps - - done chan struct{} - once sync.Once - wg sync.WaitGroup -} - -// NewClientConn creates a new ClientConn. -func NewClientConn(log *logging.Logger, conn net.Conn, local, remote cipher.PubKey) *ClientConn { - cc := &ClientConn{ - log: log, - Conn: conn, - local: local, - remoteSrv: remote, - nextInitID: randID(true), - tps: make(map[uint16]*Transport), - done: make(chan struct{}), - } - cc.wg.Add(1) - return cc -} - -// RemotePK returns the remote Server's PK that the ClientConn is connected to. -func (c *ClientConn) RemotePK() cipher.PubKey { return c.remoteSrv } - -func (c *ClientConn) getNextInitID(ctx context.Context) (uint16, error) { - for { - select { - case <-c.done: - return 0, ErrClientClosed - case <-ctx.Done(): - return 0, ctx.Err() - default: - if ch := c.tps[c.nextInitID]; ch != nil && !ch.IsClosed() { - c.nextInitID += 2 - continue - } - c.tps[c.nextInitID] = nil - id := c.nextInitID - c.nextInitID = id + 2 - return id, nil - } - } -} - -func (c *ClientConn) addTp(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) { - c.mx.Lock() - defer c.mx.Unlock() - - id, err := c.getNextInitID(ctx) - if err != nil { - return nil, err - } - tp := NewTransport(c.Conn, c.log, c.local, clientPK, id, c.delTp) - c.tps[id] = tp - return tp, nil -} - -func (c *ClientConn) setTp(tp *Transport) { - c.mx.Lock() - c.tps[tp.id] = tp - c.mx.Unlock() -} - -func (c *ClientConn) delTp(id uint16) { - c.mx.Lock() - c.tps[id] = nil - c.mx.Unlock() -} - -func (c *ClientConn) getTp(id uint16) (*Transport, bool) { - c.mx.RLock() - tp := c.tps[id] - c.mx.RUnlock() - ok := tp != nil && !tp.IsClosed() - return tp, ok -} - -func (c *ClientConn) setNextInitID(nextInitID uint16) { - c.mx.Lock() - c.nextInitID = nextInitID - c.mx.Unlock() -} - -func (c *ClientConn) readOK() error { - fr, err := readFrame(c.Conn) - if err != nil { - return errors.New("failed to get OK from server") - } - - ft, _, _ := fr.Disassemble() - if ft != OkType { - return fmt.Errorf("wrong frame from server: %v", ft) - } - - return nil -} - -func (c *ClientConn) handleRequestFrame(accept chan<- *Transport, id uint16, p []byte) (cipher.PubKey, error) { - // remotely-initiated tps should: - // - have a payload structured as 'init_pk:resp_pk'. - // - resp_pk should be of local client. - // - use an odd tp_id with the intermediary dmsg_server. - initPK, respPK, ok := splitPKs(p) - if !ok || respPK != c.local || isInitiatorID(id) { - if err := writeCloseFrame(c.Conn, id, 0); err != nil { - return initPK, err - } - return initPK, ErrRequestCheckFailed - } - - tp := NewTransport(c.Conn, c.log, c.local, initPK, id, c.delTp) - - select { - case <-c.done: - if err := tp.Close(); err != nil { - log.WithError(err).Warn("Failed to close transport") - } - return initPK, ErrClientClosed - default: - select { - case accept <- tp: - c.setTp(tp) - if err := tp.WriteAccept(); err != nil { - return initPK, err - } - go tp.Serve() - return initPK, nil - - default: - if err := tp.Close(); err != nil { - log.WithError(err).Warn("Failed to close transport") - } - return initPK, ErrClientAcceptMaxed - } - } -} - -// Serve handles incoming frames. -// Remote-initiated tps that are successfully created are pushing into 'accept' and exposed via 'Client.Accept()'. -func (c *ClientConn) Serve(ctx context.Context, accept chan<- *Transport) (err error) { - log := c.log.WithField("remoteServer", c.remoteSrv) - log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") - defer func() { - c.close() - log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ConnectionClosed") - c.wg.Done() - }() - - for { - f, err := readFrame(c.Conn) - if err != nil { - return fmt.Errorf("read failed: %s", err) - } - log = log.WithField("received", f) - - ft, id, p := f.Disassemble() - - // If tp of tp_id exists, attempt to forward frame to tp. - // delete tp on any failure. - - if tp, ok := c.getTp(id); ok { - if err := tp.HandleFrame(f); err != nil { - log.WithError(err).Warnf("Rejected [%s]: Transport closed.", ft) - } - continue - } - - // if tp does not exist, frame should be 'REQUEST'. - // otherwise, handle any unexpected frames accordingly. - - c.delTp(id) // rm tp in case closed tp is not fully removed. - - switch ft { - case RequestType: - c.wg.Add(1) - go func(log *logrus.Entry) { - defer c.wg.Done() - initPK, err := c.handleRequestFrame(accept, id, p) - if err != nil { - log.WithField("remoteClient", initPK).WithError(err).Infoln("Rejected [REQUEST]") - if isWriteError(err) || err == ErrClientClosed { - err := c.Close() - log.WithError(err).Warn("ClosingConnection") - } - return - } - log.WithField("remoteClient", initPK).Infoln("Accepted [REQUEST]") - }(log) - - default: - log.Debugf("Ignored [%s]: No transport of given ID.", ft) - if ft != CloseType { - if err := writeCloseFrame(c.Conn, id, 0); err != nil { - return err - } - } - } - } -} - -// DialTransport dials a transport to remote dms_client. -func (c *ClientConn) DialTransport(ctx context.Context, clientPK cipher.PubKey) (*Transport, error) { - tp, err := c.addTp(ctx, clientPK) - if err != nil { - return nil, err - } - if err := tp.WriteRequest(); err != nil { - return nil, err - } - if err := tp.ReadAccept(ctx); err != nil { - return nil, err - } - go tp.Serve() - return tp, nil -} - -func (c *ClientConn) close() (closed bool) { - if c == nil { - return false - } - c.once.Do(func() { - closed = true - c.log.WithField("remoteServer", c.remoteSrv).Infoln("ClosingConnection") - close(c.done) - c.mx.Lock() - for _, tp := range c.tps { - tp := tp - go func() { - if err := tp.Close(); err != nil { - log.WithError(err).Warn("Failed to close transport") - } - }() - } - if err := c.Conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") - } - c.mx.Unlock() - }) - return closed -} - -// Close closes the connection to dms_server. -func (c *ClientConn) Close() error { - if c.close() { - c.wg.Wait() - } - return nil -} - // ClientOption represents an optional argument for Client. type ClientOption func(c *Client) error @@ -320,21 +55,22 @@ type Client struct { conns map[cipher.PubKey]*ClientConn // conns with messaging servers. Key: pk of server mx sync.RWMutex - accept chan *Transport - done chan struct{} - once sync.Once + pm *PortManager + + done chan struct{} + once sync.Once } // NewClient creates a new Client. func NewClient(pk cipher.PubKey, sk cipher.SecKey, dc disc.APIClient, opts ...ClientOption) *Client { c := &Client{ - log: logging.MustGetLogger("dmsg_client"), - pk: pk, - sk: sk, - dc: dc, - conns: make(map[cipher.PubKey]*ClientConn), - accept: make(chan *Transport, AcceptBufferSize), - done: make(chan struct{}), + log: logging.MustGetLogger("dmsg_client"), + pk: pk, + sk: sk, + dc: dc, + conns: make(map[cipher.PubKey]*ClientConn), + pm: newPortManager(pk), + done: make(chan struct{}), } for _, opt := range opts { if err := opt(c); err != nil { @@ -364,7 +100,7 @@ func (c *Client) updateDiscEntry(ctx context.Context) error { func (c *Client) setConn(ctx context.Context, conn *ClientConn) { c.mx.Lock() - c.conns[conn.remoteSrv] = conn + c.conns[conn.srvPK] = conn if err := c.updateDiscEntry(ctx); err != nil { c.log.WithError(err).Warn("updateEntry: failed") } @@ -403,7 +139,7 @@ func (c *Client) InitiateServerConnections(ctx context.Context, min int) error { if err != nil { return err } - c.log.Info("found dms_server entries:", entries) + c.log.Info("found dmsg.Server entries:", entries) if err := c.findOrConnectToServers(ctx, entries, min); err != nil { return err } @@ -419,7 +155,7 @@ func (c *Client) findServerEntries(ctx context.Context) ([]*disc.Entry, error) { return nil, fmt.Errorf("dms_servers are not available: %s", err) default: retry := time.Second - c.log.WithError(err).Warnf("no dms_servers found: trying again in %d second...", retry) + c.log.WithError(err).Warnf("no dms_servers found: trying again in %v...", retry) time.Sleep(retry) continue } @@ -474,7 +210,7 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey) return nil, err } - conn := NewClientConn(c.log, nc, c.pk, srvPK) + conn := NewClientConn(c.log, c.pm, nc, c.pk, srvPK) if err := conn.readOK(); err != nil { return nil, err } @@ -482,7 +218,7 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey) c.setConn(ctx, conn) go func() { - err := conn.Serve(ctx, c.accept) + err := conn.Serve(ctx) conn.log.WithError(err).WithField("remoteServer", srvPK).Warn("connected with server closed") c.delConn(ctx, srvPK) @@ -503,23 +239,17 @@ func (c *Client) findOrConnectToServer(ctx context.Context, srvPK cipher.PubKey) return conn, nil } -// Accept accepts remotely-initiated tps. -func (c *Client) Accept(ctx context.Context) (*Transport, error) { - select { - case tp, ok := <-c.accept: - if !ok { - return nil, ErrClientClosed - } - return tp, nil - case <-c.done: - return nil, ErrClientClosed - case <-ctx.Done(): - return nil, ctx.Err() +// Listen creates a listener on a given port, adds it to port manager and returns the listener. +func (c *Client) Listen(port uint16) (*Listener, error) { + l, ok := c.pm.NewListener(port) + if !ok { + return nil, errors.New("port is busy") } + return l, nil } // Dial dials a transport to remote dms_client. -func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (*Transport, error) { +func (c *Client) Dial(ctx context.Context, remote cipher.PubKey, port uint16) (*Transport, error) { entry, err := c.dc.Entry(ctx, remote) if err != nil { return nil, fmt.Errorf("get entry failure: %s", err) @@ -536,14 +266,16 @@ func (c *Client) Dial(ctx context.Context, remote cipher.PubKey) (*Transport, er c.log.WithError(err).Warn("failed to connect to server") continue } - return conn.DialTransport(ctx, remote) + return conn.DialTransport(ctx, remote, port) } return nil, errors.New("failed to find dms_servers for given client pk") } -// Local returns the local dms_client's public key. -func (c *Client) Local() cipher.PubKey { - return c.pk +// Addr returns the local dms_client's public key. +func (c *Client) Addr() net.Addr { + return Addr{ + PK: c.pk, + } } // Type returns the transport type. @@ -553,7 +285,7 @@ func (c *Client) Type() string { // Close closes the dms_client and associated connections. // TODO(evaninjin): proper error handling. -func (c *Client) Close() error { +func (c *Client) Close() (err error) { if c == nil { return nil } @@ -570,14 +302,8 @@ func (c *Client) Close() error { c.conns = make(map[cipher.PubKey]*ClientConn) c.mx.Unlock() - for { - select { - case <-c.accept: - default: - close(c.accept) - return - } - } + err = c.pm.Close() }) - return nil + + return err } diff --git a/vendor/github.com/skycoin/dmsg/client_conn.go b/vendor/github.com/skycoin/dmsg/client_conn.go new file mode 100644 index 0000000000..be48e6adbb --- /dev/null +++ b/vendor/github.com/skycoin/dmsg/client_conn.go @@ -0,0 +1,295 @@ +package dmsg + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + + "github.com/sirupsen/logrus" + "github.com/skycoin/skycoin/src/util/logging" + + "github.com/skycoin/dmsg/cipher" +) + +// ClientConn represents a connection between a dmsg.Client and dmsg.Server from a client's perspective. +type ClientConn struct { + log *logging.Logger + + net.Conn // conn to dmsg server + lPK cipher.PubKey // local client's pk + srvPK cipher.PubKey // dmsg server's public key + + // nextInitID keeps track of unused tp_ids to assign a future locally-initiated tp. + // locally-initiated tps use an even tp_id between local and intermediary dms_server. + nextInitID uint16 + + // Transports: map of transports to remote dms_clients (key: tp_id, val: transport). + tps map[uint16]*Transport + mx sync.RWMutex // to protect tps + + pm *PortManager + + done chan struct{} + once sync.Once + wg sync.WaitGroup +} + +// NewClientConn creates a new ClientConn. +func NewClientConn(log *logging.Logger, pm *PortManager, conn net.Conn, lPK, rPK cipher.PubKey) *ClientConn { + cc := &ClientConn{ + log: log, + Conn: conn, + lPK: lPK, + srvPK: rPK, + nextInitID: randID(true), + tps: make(map[uint16]*Transport), + pm: pm, + done: make(chan struct{}), + } + cc.wg.Add(1) + return cc +} + +// RemotePK returns the remote Server's PK that the ClientConn is connected to. +func (c *ClientConn) RemotePK() cipher.PubKey { return c.srvPK } + +func (c *ClientConn) getNextInitID(ctx context.Context) (uint16, error) { + for { + select { + case <-c.done: + return 0, ErrClientClosed + case <-ctx.Done(): + return 0, ctx.Err() + default: + if ch := c.tps[c.nextInitID]; ch != nil && !ch.IsClosed() { + c.nextInitID += 2 + continue + } + c.tps[c.nextInitID] = nil + id := c.nextInitID + c.nextInitID = id + 2 + return id, nil + } + } +} + +func (c *ClientConn) addTp(ctx context.Context, rPK cipher.PubKey, lPort, rPort uint16, closeCB func()) (*Transport, error) { + c.mx.Lock() + defer c.mx.Unlock() + + id, err := c.getNextInitID(ctx) + if err != nil { + return nil, err + } + tp := NewTransport(c.Conn, c.log, Addr{c.lPK, lPort}, Addr{rPK, rPort}, id, func() { + c.delTp(id) + closeCB() + }) + c.tps[id] = tp + return tp, nil +} + +func (c *ClientConn) setTp(tp *Transport) { + c.mx.Lock() + c.tps[tp.id] = tp + c.mx.Unlock() +} + +func (c *ClientConn) delTp(id uint16) { + c.mx.Lock() + c.tps[id] = nil + c.mx.Unlock() +} + +func (c *ClientConn) getTp(id uint16) (*Transport, bool) { + c.mx.RLock() + tp := c.tps[id] + c.mx.RUnlock() + ok := tp != nil && !tp.IsClosed() + return tp, ok +} + +func (c *ClientConn) setNextInitID(nextInitID uint16) { + c.mx.Lock() + c.nextInitID = nextInitID + c.mx.Unlock() +} + +func (c *ClientConn) readOK() error { + _, df, err := readFrame(c.Conn) + if err != nil { + return errors.New("failed to get OK from server") + } + if df.Type != OkType { + return fmt.Errorf("wrong frame from server: %v", df.Type) + } + return nil +} + +// This handles 'REQUEST' frames which represent remotely-initiated tps. 'REQUEST' frames should: +// - have a HandshakePayload marshaled to JSON as payload. +// - have a resp_pk be of local client. +// - have an odd tp_id. +func (c *ClientConn) handleRequestFrame(log *logrus.Entry, id uint16, p []byte) (cipher.PubKey, error) { + + // The public key of the initiating client (or the client that sent the 'REQUEST' frame). + var initPK cipher.PubKey + + // Attempts to close tp due to given error. + // When we fail to close tp (a.k.a fail to send 'CLOSE' frame) or if the local client is closed, + // the connection to server should be closed. + // TODO(evanlinjin): derive close reason from error. + closeTp := func(origErr error) (cipher.PubKey, error) { + if err := writeCloseFrame(c.Conn, id, PlaceholderReason); err != nil { + log.WithError(err).Warn("handleRequestFrame: failed to close transport: ending conn to server.") + log.WithError(c.Close()).Warn("handleRequestFrame: closing connection to server.") + return initPK, origErr + } + switch origErr { + case ErrClientClosed: + log.WithError(c.Close()).Warn("handleRequestFrame: closing connection to server.") + } + return initPK, origErr + } + + pay, err := unmarshalHandshakePayload(p) + if err != nil { + return closeTp(ErrRequestCheckFailed) // TODO(nkryuchkov): reason = payload format is incorrect. + } + initPK = pay.InitAddr.PK + + if pay.RespAddr.PK != c.lPK || isInitiatorID(id) { + return closeTp(ErrRequestCheckFailed) // TODO(nkryuchkov): reason = payload is malformed. + } + lis, ok := c.pm.Listener(pay.RespAddr.Port) + if !ok { + return closeTp(ErrPortNotListening) // TODO(nkryuchkov): reason = port is not listening. + } + if c.isClosed() { + return closeTp(ErrClientClosed) // TODO(nkryuchkov): reason = client is closed. + } + + tp := NewTransport(c.Conn, c.log, pay.RespAddr, pay.InitAddr, id, func() { c.delTp(id) }) + if err := lis.IntroduceTransport(tp); err != nil { + return initPK, err + } + c.setTp(tp) + return initPK, nil +} + +// Serve handles incoming frames. +// Remote-initiated tps that are successfully created are pushing into 'accept' and exposed via 'Client.Accept()'. +func (c *ClientConn) Serve(ctx context.Context) (err error) { + log := c.log.WithField("remoteServer", c.srvPK) + log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") + defer func() { + c.close() + log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ConnectionClosed") + c.wg.Done() + }() + + for { + f, df, err := readFrame(c.Conn) + if err != nil { + return fmt.Errorf("read failed: %s", err) + } + log = log.WithField("received", f) + + // If tp of tp_id exists, attempt to forward frame to tp. + // Delete tp on any failure. + if tp, ok := c.getTp(df.TpID); ok { + if err := tp.HandleFrame(f); err != nil { + log.WithError(err).Warnf("Rejected [%s]: Transport closed.", df.Type) + } + continue + } + c.delTp(df.TpID) // rm tp in case closed tp is not fully removed. + + // if tp does not exist, frame should be 'REQUEST'. + // otherwise, handle any unexpected frames accordingly. + switch df.Type { + case RequestType: + c.wg.Add(1) + go func(log *logrus.Entry) { + defer c.wg.Done() + if initPK, err := c.handleRequestFrame(log, df.TpID, df.Pay); err != nil { + log.WithField("remoteClient", initPK).WithError(err).Warn("Rejected [REQUEST]") + } else { + log.WithField("remoteClient", initPK).Info("Accepted [REQUEST]") + } + }(log) + + default: + log.Debugf("Ignored [%s]: No transport of given ID.", df.Type) + if df.Type != CloseType { + if err := writeCloseFrame(c.Conn, df.TpID, PlaceholderReason); err != nil { + return err + } + } + } + } +} + +// DialTransport dials a transport to remote dms_client. +func (c *ClientConn) DialTransport(ctx context.Context, rPK cipher.PubKey, rPort uint16) (*Transport, error) { + lPort, closeCB, err := c.pm.ReserveEphemeral(ctx) + if err != nil { + return nil, err + } + tp, err := c.addTp(ctx, rPK, lPort, rPort, closeCB) // TODO: Have proper local port. + if err != nil { + return nil, err + } + if err := tp.WriteRequest(); err != nil { + return nil, err + } + if err := tp.ReadAccept(ctx); err != nil { + return nil, err + } + go tp.Serve() + return tp, nil +} + +func (c *ClientConn) close() (closed bool) { + if c == nil { + return false + } + c.once.Do(func() { + closed = true + c.log.WithField("remoteServer", c.srvPK).Infoln("ClosingConnection") + close(c.done) + c.mx.Lock() + for _, tp := range c.tps { + tp := tp + go func() { + if err := tp.Close(); err != nil { + log.WithError(err).Warn("Failed to close transport") + } + }() + } + if err := c.Conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } + c.mx.Unlock() + }) + return closed +} + +// Close closes the connection to dms_server. +func (c *ClientConn) Close() error { + if c.close() { + c.wg.Wait() + } + return nil +} + +func (c *ClientConn) isClosed() bool { + select { + case <-c.done: + return true + default: + return false + } +} diff --git a/vendor/github.com/skycoin/dmsg/listener.go b/vendor/github.com/skycoin/dmsg/listener.go new file mode 100644 index 0000000000..3fc6f48a46 --- /dev/null +++ b/vendor/github.com/skycoin/dmsg/listener.go @@ -0,0 +1,124 @@ +package dmsg + +import ( + "fmt" + "net" + "sync" +) + +// Listener listens for remote-initiated transports. +type Listener struct { + addr Addr // local listening address + + accept chan *Transport + mx sync.Mutex // protects 'accept' + + doneFunc func() // callback when done + done chan struct{} + once sync.Once +} + +func newListener(addr Addr) *Listener { + return &Listener{ + addr: addr, + accept: make(chan *Transport, AcceptBufferSize), + done: make(chan struct{}), + } +} + +// AddCloseCallback adds a function that triggers when listener is closed. +// This should be called right after the listener is created and is not thread safe. +func (l *Listener) AddCloseCallback(cb func()) { l.doneFunc = cb } + +// IntroduceTransport handles a transport after receiving a REQUEST frame. +func (l *Listener) IntroduceTransport(tp *Transport) error { + if tp.LocalAddr() != l.addr { + return fmt.Errorf("failed to accept transport as local addresses does not match: we expected %s but got %s", + l.addr, tp.LocalAddr()) + } + + l.mx.Lock() + defer l.mx.Unlock() + + if l.isClosed() { + return ErrClientClosed + } + + select { + case <-l.done: + return ErrClientClosed + + case l.accept <- tp: + if err := tp.WriteAccept(); err != nil { + return err + } + go tp.Serve() + return nil + + default: + _ = tp.Close() //nolint:errcheck + return ErrClientAcceptMaxed + } +} + +// Accept accepts a connection. +func (l *Listener) Accept() (net.Conn, error) { + return l.AcceptTransport() +} + +// AcceptTransport accepts a transport connection. +func (l *Listener) AcceptTransport() (*Transport, error) { + select { + case <-l.done: + return nil, ErrClientClosed + case tp, ok := <-l.accept: + if !ok { + return nil, ErrClientClosed + } + return tp, nil + } +} + +// Close closes the listener. +func (l *Listener) Close() error { + if l.close() { + return nil + } + return ErrClientClosed +} + +func (l *Listener) close() (closed bool) { + l.once.Do(func() { + closed = true + l.doneFunc() + + l.mx.Lock() + defer l.mx.Unlock() + + close(l.done) + for { + select { + case <-l.accept: + default: + close(l.accept) + return + } + } + }) + return closed +} + +func (l *Listener) isClosed() bool { + select { + case <-l.done: + return true + default: + return false + } +} + +// Addr returns the listener's address. +func (l *Listener) Addr() net.Addr { return l.addr } + +// Type returns the transport type. +func (l *Listener) Type() string { return Type } diff --git a/vendor/github.com/skycoin/dmsg/netutil/porter.go b/vendor/github.com/skycoin/dmsg/netutil/porter.go new file mode 100644 index 0000000000..fb0d2c1b26 --- /dev/null +++ b/vendor/github.com/skycoin/dmsg/netutil/porter.go @@ -0,0 +1,102 @@ +package netutil + +import ( + "context" + "sync" +) + +const ( + // PorterMinEphemeral is the default minimum ephemeral port. + PorterMinEphemeral = uint16(49152) +) + +// Porter reserves ports. +type Porter struct { + sync.RWMutex + eph uint16 // current ephemeral value + minEph uint16 // minimal ephemeral port value + ports map[uint16]interface{} +} + +// NewPorter creates a new Porter with a given minimum ephemeral port value. +func NewPorter(minEph uint16) *Porter { + ports := make(map[uint16]interface{}) + ports[0] = struct{}{} // port 0 is invalid + + return &Porter{ + eph: minEph, + minEph: minEph, + ports: ports, + } +} + +// Reserve a given port. +// It returns a boolean informing whether the port is reserved, and a function to clear the reservation. +func (p *Porter) Reserve(port uint16, v interface{}) (bool, func()) { + p.Lock() + defer p.Unlock() + + if _, ok := p.ports[port]; ok { + return false, nil + } + p.ports[port] = v + return true, p.makePortFreer(port) +} + +// ReserveEphemeral reserves a new ephemeral port. +// It returns the reserved ephemeral port, a function to clear the reservation and an error (if any). +func (p *Porter) ReserveEphemeral(ctx context.Context, v interface{}) (uint16, func(), error) { + p.Lock() + defer p.Unlock() + + for { + p.eph++ + if p.eph < p.minEph { + p.eph = p.minEph + } + if _, ok := p.ports[p.eph]; ok { + select { + case <-ctx.Done(): + return 0, nil, ctx.Err() + default: + continue + } + } + p.ports[p.eph] = v + return p.eph, p.makePortFreer(p.eph), nil + } +} + +// PortValue returns the value stored under a given port. +func (p *Porter) PortValue(port uint16) (interface{}, bool) { + p.RLock() + defer p.RUnlock() + + v, ok := p.ports[port] + return v, ok +} + +// RangePortValues ranges all ports that are currently reserved. +func (p *Porter) RangePortValues(fn func(port uint16, v interface{}) (next bool)) { + p.RLock() + defer p.RUnlock() + + for port, v := range p.ports { + if next := fn(port, v); !next { + return + } + } +} + +// This returns a function that frees a given port. +// It is ensured that the function's action is only performed once. +func (p *Porter) makePortFreer(port uint16) func() { + once := new(sync.Once) + return func() { + once.Do(func() { + p.Lock() + delete(p.ports, port) + p.Unlock() + }) + } +} diff --git a/vendor/github.com/skycoin/dmsg/port_manager.go b/vendor/github.com/skycoin/dmsg/port_manager.go new file mode 100644 index 0000000000..0ab5a18e4a --- /dev/null +++ b/vendor/github.com/skycoin/dmsg/port_manager.go @@ -0,0 +1,66 @@ +package dmsg + +import ( + "context" + "sync" + + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/netutil" +) + +// PortManager manages ports of nodes. +type PortManager struct { + lPK cipher.PubKey + p *netutil.Porter +} + +func newPortManager(lPK cipher.PubKey) *PortManager { + return &PortManager{ + lPK: lPK, + p: netutil.NewPorter(netutil.PorterMinEphemeral), + } +} + +// Listener returns a listener assigned to a given port. +func (pm *PortManager) Listener(port uint16) (*Listener, bool) { + v, ok := pm.p.PortValue(port) + if !ok { + return nil, false + } + l, ok := v.(*Listener) + return l, ok +} + +// NewListener assigns listener to port if port is available. +func (pm *PortManager) NewListener(port uint16) (*Listener, bool) { + l := newListener(Addr{pm.lPK, port}) + ok, clear := pm.p.Reserve(port, l) + if !ok { + return nil, false + } + l.AddCloseCallback(clear) + return l, true +} + +// ReserveEphemeral reserves an ephemeral port. +func (pm *PortManager) ReserveEphemeral(ctx context.Context) (uint16, func(), error) { + return pm.p.ReserveEphemeral(ctx, nil) +} + +// Close closes all listeners. +func (pm *PortManager) Close() error { + wg := new(sync.WaitGroup) + pm.p.RangePortValues(func(_ uint16, v interface{}) (next bool) { + l, ok := v.(*Listener) + if ok { + wg.Add(1) + go func() { + l.close() + wg.Done() + }() + } + return true + }) + wg.Wait() + return nil +} diff --git a/vendor/github.com/skycoin/dmsg/server.go b/vendor/github.com/skycoin/dmsg/server.go index 4433b65cd8..a5bfa304c2 100644 --- a/vendor/github.com/skycoin/dmsg/server.go +++ b/vendor/github.com/skycoin/dmsg/server.go @@ -19,238 +19,6 @@ import ( // ErrListenerAlreadyWrappedToNoise occurs when the provided net.Listener is already wrapped with noise.Listener var ErrListenerAlreadyWrappedToNoise = errors.New("listener is already wrapped to *noise.Listener") -// NextConn provides information on the next connection. -type NextConn struct { - conn *ServerConn - id uint16 -} - -func (r *NextConn) writeFrame(ft FrameType, p []byte) error { - if err := writeFrame(r.conn.Conn, MakeFrame(ft, r.id, p)); err != nil { - go func() { - if err := r.conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") - } - }() - return err - } - return nil -} - -// ServerConn is a connection between a dmsg.Server and a dmsg.Client from a server's perspective. -type ServerConn struct { - log *logging.Logger - - net.Conn - remoteClient cipher.PubKey - - nextRespID uint16 - nextConns map[uint16]*NextConn - mx sync.RWMutex -} - -// NewServerConn creates a new connection from the perspective of a dms_server. -func NewServerConn(log *logging.Logger, conn net.Conn, remoteClient cipher.PubKey) *ServerConn { - return &ServerConn{ - log: log, - Conn: conn, - remoteClient: remoteClient, - nextRespID: randID(false), - nextConns: make(map[uint16]*NextConn), - } -} - -func (c *ServerConn) delNext(id uint16) { - c.mx.Lock() - delete(c.nextConns, id) - c.mx.Unlock() -} - -func (c *ServerConn) setNext(id uint16, r *NextConn) { - c.mx.Lock() - c.nextConns[id] = r - c.mx.Unlock() -} - -func (c *ServerConn) getNext(id uint16) (*NextConn, bool) { - c.mx.RLock() - r := c.nextConns[id] - c.mx.RUnlock() - return r, r != nil -} - -func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) { - c.mx.Lock() - defer c.mx.Unlock() - - for { - if r := c.nextConns[c.nextRespID]; r == nil { - break - } - c.nextRespID += 2 - - select { - case <-ctx.Done(): - return 0, ctx.Err() - default: - } - } - - id := c.nextRespID - c.nextRespID = id + 2 - c.nextConns[id] = r - return id, nil -} - -// PK returns the remote dms_client's public key. -func (c *ServerConn) PK() cipher.PubKey { - return c.remoteClient -} - -type getConnFunc func(pk cipher.PubKey) (*ServerConn, bool) - -// Serve handles (and forwards when necessary) incoming frames. -func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) (err error) { - log := c.log.WithField("srcClient", c.remoteClient) - - // Only manually close the underlying net.Conn when the done signal is context-initiated. - done := make(chan struct{}) - defer close(done) - go func() { - select { - case <-done: - case <-ctx.Done(): - if err := c.Conn.Close(); err != nil { - log.WithError(err).Warn("failed to close underlying connection") - } - } - }() - - defer func() { - // Send CLOSE frames to all transports which are established with this dmsg.Client - // This ensures that all parties are informed about the transport closing. - c.mx.Lock() - for _, conn := range c.nextConns { - why := byte(0) - if err := conn.writeFrame(CloseType, []byte{why}); err != nil { - log.WithError(err).Warnf("failed to write frame: %s", err) - } - } - c.mx.Unlock() - - log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ClosingConn") - if err := c.Conn.Close(); err != nil { - log.WithError(err).Warn("Failed to close connection") - } - }() - log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") - - err = c.writeOK() - if err != nil { - return fmt.Errorf("sending OK failed: %s", err) - } - - for { - f, err := readFrame(c.Conn) - if err != nil { - return fmt.Errorf("read failed: %s", err) - } - log = log.WithField("received", f) - - ft, id, p := f.Disassemble() - - switch ft { - case RequestType: - ctx, cancel := context.WithTimeout(ctx, TransportHandshakeTimeout) - _, why, ok := c.handleRequest(ctx, getConn, id, p) - cancel() - if !ok { - log.Debugln("FrameRejected: Erroneous request or unresponsive dstClient.") - if err := c.delChan(id, why); err != nil { - return err - } - } - log.Debugln("FrameForwarded") - - case AcceptType, FwdType, AckType, CloseType: - next, why, ok := c.forwardFrame(ft, id, p) - if !ok { - log.Debugln("FrameRejected: Failed to forward to dstClient.") - // Delete channel (and associations) on failure. - if err := c.delChan(id, why); err != nil { - return err - } - continue - } - log.Debugln("FrameForwarded") - - // On success, if Close frame, delete the associations. - if ft == CloseType { - c.delNext(id) - next.conn.delNext(next.id) - } - - default: - log.Debugln("FrameRejected: Unknown frame type.") - // Unknown frame type. - return errors.New("unknown frame of type received") - } - } -} - -func (c *ServerConn) delChan(id uint16, why byte) error { - c.delNext(id) - if err := writeFrame(c.Conn, MakeFrame(CloseType, id, []byte{why})); err != nil { - return fmt.Errorf("failed to write frame: %s", err) - } - return nil -} - -func (c *ServerConn) writeOK() error { - if err := writeFrame(c.Conn, MakeFrame(OkType, 0, nil)); err != nil { - return err - } - return nil -} - -// nolint:unparam -func (c *ServerConn) forwardFrame(ft FrameType, id uint16, p []byte) (*NextConn, byte, bool) { - next, ok := c.getNext(id) - if !ok { - return next, 0, false - } - if err := next.writeFrame(ft, p); err != nil { - return next, 0, false - } - return next, 0, true -} - -// nolint:unparam -func (c *ServerConn) handleRequest(ctx context.Context, getLink getConnFunc, id uint16, p []byte) (*NextConn, byte, bool) { - initPK, respPK, ok := splitPKs(p) - if !ok || initPK != c.PK() { - return nil, 0, false - } - respL, ok := getLink(respPK) - if !ok { - return nil, 0, false - } - - // set next relations. - respID, err := respL.addNext(ctx, &NextConn{conn: c, id: id}) - if err != nil { - return nil, 0, false - } - next := &NextConn{conn: respL, id: respID} - c.setNext(id, next) - - // forward to responding client. - if err := next.writeFrame(RequestType, p); err != nil { - return next, 0, false - } - return next, 0, true -} - // Server represents a dms_server. type Server struct { log *logging.Logger diff --git a/vendor/github.com/skycoin/dmsg/server_conn.go b/vendor/github.com/skycoin/dmsg/server_conn.go new file mode 100644 index 0000000000..a162b5102b --- /dev/null +++ b/vendor/github.com/skycoin/dmsg/server_conn.go @@ -0,0 +1,243 @@ +package dmsg + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + + "github.com/skycoin/skycoin/src/util/logging" + + "github.com/skycoin/dmsg/cipher" +) + +// NextConn provides information on the next connection. +type NextConn struct { + conn *ServerConn + id uint16 +} + +func (r *NextConn) writeFrame(ft FrameType, p []byte) error { + if err := writeFrame(r.conn.Conn, MakeFrame(ft, r.id, p)); err != nil { + go func() { + if err := r.conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } + }() + return err + } + return nil +} + +// ServerConn is a connection between a dmsg.Server and a dmsg.Client from a server's perspective. +type ServerConn struct { + log *logging.Logger + + net.Conn + remoteClient cipher.PubKey + + nextRespID uint16 + nextConns map[uint16]*NextConn + mx sync.RWMutex +} + +// NewServerConn creates a new connection from the perspective of a dms_server. +func NewServerConn(log *logging.Logger, conn net.Conn, remoteClient cipher.PubKey) *ServerConn { + return &ServerConn{ + log: log, + Conn: conn, + remoteClient: remoteClient, + nextRespID: randID(false), + nextConns: make(map[uint16]*NextConn), + } +} + +func (c *ServerConn) delNext(id uint16) { + c.mx.Lock() + delete(c.nextConns, id) + c.mx.Unlock() +} + +func (c *ServerConn) setNext(id uint16, r *NextConn) { + c.mx.Lock() + c.nextConns[id] = r + c.mx.Unlock() +} + +func (c *ServerConn) getNext(id uint16) (*NextConn, bool) { + c.mx.RLock() + r := c.nextConns[id] + c.mx.RUnlock() + return r, r != nil +} + +func (c *ServerConn) addNext(ctx context.Context, r *NextConn) (uint16, error) { + c.mx.Lock() + defer c.mx.Unlock() + + for { + if r := c.nextConns[c.nextRespID]; r == nil { + break + } + c.nextRespID += 2 + + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + } + + id := c.nextRespID + c.nextRespID = id + 2 + c.nextConns[id] = r + return id, nil +} + +// PK returns the remote dms_client's public key. +func (c *ServerConn) PK() cipher.PubKey { + return c.remoteClient +} + +type getConnFunc func(pk cipher.PubKey) (*ServerConn, bool) + +// Serve handles (and forwards when necessary) incoming frames. +func (c *ServerConn) Serve(ctx context.Context, getConn getConnFunc) (err error) { + log := c.log.WithField("srcClient", c.remoteClient) + + // Only manually close the underlying net.Conn when the done signal is context-initiated. + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-done: + case <-ctx.Done(): + if err := c.Conn.Close(); err != nil { + log.WithError(err).Warn("failed to close underlying connection") + } + } + }() + + defer func() { + // Send CLOSE frames to all transports which are established with this dmsg.Client + // This ensures that all parties are informed about the transport closing. + c.mx.Lock() + for _, conn := range c.nextConns { + why := byte(0) + if err := conn.writeFrame(CloseType, []byte{why}); err != nil { + log.WithError(err).Warnf("failed to write frame: %s", err) + } + } + c.mx.Unlock() + + log.WithError(err).WithField("connCount", decrementServeCount()).Infoln("ClosingConn") + if err := c.Conn.Close(); err != nil { + log.WithError(err).Warn("Failed to close connection") + } + }() + + log.WithField("connCount", incrementServeCount()).Infoln("ServingConn") + + err = c.writeOK() + if err != nil { + return fmt.Errorf("sending OK failed: %s", err) + } + + for { + f, df, err := readFrame(c.Conn) + if err != nil { + return fmt.Errorf("read failed: %s", err) + } + log := log.WithField("received", f) + + switch df.Type { + case RequestType: + ctx, cancel := context.WithTimeout(ctx, TransportHandshakeTimeout) + _, why, ok := c.handleRequest(ctx, getConn, df.TpID, df.Pay) + cancel() + if !ok { + log.Debugln("FrameRejected: Erroneous request or unresponsive dstClient.") + if err := c.delChan(df.TpID, why); err != nil { + return err + } + } + log.Debugln("FrameForwarded") + + case AcceptType, FwdType, AckType, CloseType: + next, why, ok := c.forwardFrame(df.Type, df.TpID, df.Pay) + if !ok { + log.Debugln("FrameRejected: Failed to forward to dstClient.") + // Delete channel (and associations) on failure. + if err := c.delChan(df.TpID, why); err != nil { + return err + } + continue + } + log.Debugln("FrameForwarded") + + // On success, if Close frame, delete the associations. + if df.Type == CloseType { + c.delNext(df.TpID) + next.conn.delNext(next.id) + } + + default: + log.Debugln("FrameRejected: Unknown frame type.") + return errors.New("unknown frame of type received") + } + } +} + +func (c *ServerConn) delChan(id uint16, why byte) error { + c.delNext(id) + if err := writeCloseFrame(c.Conn, id, why); err != nil { + return fmt.Errorf("failed to write frame: %s", err) + } + return nil +} + +func (c *ServerConn) writeOK() error { + if err := writeFrame(c.Conn, MakeFrame(OkType, 0, nil)); err != nil { + return err + } + return nil +} + +// nolint:unparam +func (c *ServerConn) forwardFrame(ft FrameType, id uint16, p []byte) (*NextConn, byte, bool) { + next, ok := c.getNext(id) + if !ok { + return next, 0, false + } + if err := next.writeFrame(ft, p); err != nil { + return next, 0, false + } + return next, 0, true +} + +// nolint:unparam +func (c *ServerConn) handleRequest(ctx context.Context, getLink getConnFunc, id uint16, p []byte) (*NextConn, byte, bool) { + payload, err := unmarshalHandshakePayload(p) + if err != nil || payload.InitAddr.PK != c.PK() { + return nil, 0, false + } + respL, ok := getLink(payload.RespAddr.PK) + if !ok { + return nil, 0, false + } + + // set next relations. + respID, err := respL.addNext(ctx, &NextConn{conn: c, id: id}) + if err != nil { + return nil, 0, false + } + next := &NextConn{conn: respL, id: respID} + c.setNext(id, next) + + // forward to responding client. + if err := next.writeFrame(RequestType, p); err != nil { + return next, 0, false + } + return next, 0, true +} diff --git a/vendor/github.com/skycoin/dmsg/testing.go b/vendor/github.com/skycoin/dmsg/testing.go index ef9095b9f8..49a181b755 100644 --- a/vendor/github.com/skycoin/dmsg/testing.go +++ b/vendor/github.com/skycoin/dmsg/testing.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "net" "testing" "time" @@ -42,10 +43,12 @@ func checkConnCount(t *testing.T, delay time.Duration, count int, ccs ...connCou })) } -func checkTransportsClosed(t *testing.T, transports ...*Transport) { - for _, transport := range transports { - assert.False(t, isDoneChanOpen(transport.done)) - assert.False(t, isReadChanOpen(transport.inCh)) +func checkTransportsClosed(t *testing.T, transports ...net.Conn) { + for _, tr := range transports { + if tr, ok := tr.(*Transport); ok && tr != nil { + assert.False(t, isDoneChanOpen(tr.done)) + assert.False(t, isReadChanOpen(tr.inCh)) + } } } diff --git a/vendor/github.com/skycoin/dmsg/transport.go b/vendor/github.com/skycoin/dmsg/transport.go index 734983de93..5a7467172b 100644 --- a/vendor/github.com/skycoin/dmsg/transport.go +++ b/vendor/github.com/skycoin/dmsg/transport.go @@ -19,16 +19,18 @@ var ( ErrRequestRejected = errors.New("failed to create transport: request rejected") ErrRequestCheckFailed = errors.New("failed to create transport: request check failed") ErrAcceptCheckFailed = errors.New("failed to create transport: accept check failed") + ErrPortNotListening = errors.New("failed to create transport: port not listening") ) -// Transport represents a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). +// Transport represents communication between two nodes via a single hop: +// a connection from dmsg.Client to remote dmsg.Client (via dmsg.Server intermediary). type Transport struct { net.Conn // underlying connection to dmsg.Server log *logging.Logger - id uint16 // tp ID that identifies this dmsg.Transport - local cipher.PubKey // local PK - remote cipher.PubKey // remote PK + id uint16 // tp ID that identifies this dmsg.transport + local Addr // local PK + remote Addr // remote PK inCh chan Frame // handles incoming frames (from dmsg.Client) inMx sync.Mutex // protects 'inCh' @@ -39,17 +41,16 @@ type Transport struct { bufCh chan struct{} // chan for indicating whether this is a new FWD frame bufSize int // keeps track of the total size of 'buf' bufMx sync.Mutex // protects fields responsible for handling FWD and ACK frames - rMx sync.Mutex // TODO: (WORKAROUND) concurrent reads seem problematic right now. - serving chan struct{} // chan which closes when serving begins - servingOnce sync.Once // ensures 'serving' only closes once - done chan struct{} // chan which closes when transport stops serving - doneOnce sync.Once // ensures 'done' only closes once - doneFunc func(id uint16) // contains a method to remove the transport from dmsg.Client + serving chan struct{} // chan which closes when serving begins + servingOnce sync.Once // ensures 'serving' only closes once + done chan struct{} // chan which closes when transport stops serving + doneOnce sync.Once // ensures 'done' only closes once + doneFunc func() // contains a method that triggers when dmsg.Client closes } // NewTransport creates a new dms_tp. -func NewTransport(conn net.Conn, log *logging.Logger, local, remote cipher.PubKey, id uint16, doneFunc func(id uint16)) *Transport { +func NewTransport(conn net.Conn, log *logging.Logger, local, remote Addr, id uint16, doneFunc func()) *Transport { tp := &Transport{ Conn: conn, log: log, @@ -94,7 +95,7 @@ func (tp *Transport) close() (closed bool) { closed = true close(tp.done) - tp.doneFunc(tp.id) + tp.doneFunc() tp.bufMx.Lock() close(tp.bufCh) @@ -113,7 +114,7 @@ func (tp *Transport) close() (closed bool) { // Close closes the dmsg_tp. func (tp *Transport) Close() error { if tp.close() { - if err := writeFrame(tp.Conn, MakeFrame(CloseType, tp.id, []byte{0})); err != nil { + if err := writeCloseFrame(tp.Conn, tp.id, PlaceholderReason); err != nil { log.WithError(err).Warn("Failed to write frame") } } @@ -132,14 +133,20 @@ func (tp *Transport) IsClosed() bool { // LocalPK returns the local public key of the transport. func (tp *Transport) LocalPK() cipher.PubKey { - return tp.local + return tp.local.PK } // RemotePK returns the remote public key of the transport. func (tp *Transport) RemotePK() cipher.PubKey { - return tp.remote + return tp.remote.PK } +// LocalAddr returns local address in from : +func (tp *Transport) LocalAddr() net.Addr { return tp.local } + +// RemoteAddr returns remote address in form : +func (tp *Transport) RemoteAddr() net.Addr { return tp.remote } + // Type returns the transport type. func (tp *Transport) Type() string { return Type @@ -163,7 +170,16 @@ func (tp *Transport) HandleFrame(f Frame) error { // WriteRequest writes a REQUEST frame to dmsg_server to be forwarded to associated client. func (tp *Transport) WriteRequest() error { - f := MakeFrame(RequestType, tp.id, combinePKs(tp.local, tp.remote)) + payload := HandshakePayload{ + Version: HandshakePayloadVersion, + InitAddr: tp.local, + RespAddr: tp.remote, + } + payloadBytes, err := marshalHandshakePayload(payload) + if err != nil { + return err + } + f := MakeFrame(RequestType, tp.id, payloadBytes) if err := writeFrame(tp.Conn, f); err != nil { tp.log.WithError(err).Error("HandshakeFailed") tp.close() @@ -182,7 +198,7 @@ func (tp *Transport) WriteAccept() (err error) { } }() - f := MakeFrame(AcceptType, tp.id, combinePKs(tp.remote, tp.local)) + f := MakeFrame(AcceptType, tp.id, combinePKs(tp.remote.PK, tp.local.PK)) if err = writeFrame(tp.Conn, f); err != nil { tp.close() return err @@ -225,7 +241,7 @@ func (tp *Transport) ReadAccept(ctx context.Context) (err error) { // - resp_pk should be of remote client. // - use an even number with the intermediary dmsg_server. initPK, respPK, ok := splitPKs(p) - if !ok || initPK != tp.local || respPK != tp.remote || !isInitiatorID(id) { + if !ok || initPK != tp.local.PK || respPK != tp.remote.PK || !isInitiatorID(id) { if err := tp.Close(); err != nil { log.WithError(err).Warn("Failed to close transport") } @@ -257,7 +273,7 @@ func (tp *Transport) Serve() { // also write CLOSE frame if this is the first time 'close' is triggered defer func() { if tp.close() { - if err := writeCloseFrame(tp.Conn, tp.id, 0); err != nil { + if err := writeCloseFrame(tp.Conn, tp.id, PlaceholderReason); err != nil { log.WithError(err).Warn("Failed to write close frame") } } @@ -342,9 +358,6 @@ func (tp *Transport) Serve() { func (tp *Transport) Read(p []byte) (n int, err error) { <-tp.serving - tp.rMx.Lock() - defer tp.rMx.Unlock() - startRead: tp.bufMx.Lock() n, err = tp.buf.Read(p) diff --git a/vendor/github.com/skycoin/dmsg/frame.go b/vendor/github.com/skycoin/dmsg/types.go similarity index 66% rename from vendor/github.com/skycoin/dmsg/frame.go rename to vendor/github.com/skycoin/dmsg/types.go index 78e10edf5f..dcaabe6db6 100644 --- a/vendor/github.com/skycoin/dmsg/frame.go +++ b/vendor/github.com/skycoin/dmsg/types.go @@ -2,6 +2,7 @@ package dmsg import ( "encoding/binary" + "encoding/json" "fmt" "io" "math" @@ -16,6 +17,9 @@ import ( const ( // Type returns the transport type string. Type = "dmsg" + // HandshakePayloadVersion contains payload version to maintain compatibility with future versions + // of HandshakePayload format. + HandshakePayloadVersion = "2.0" tpBufCap = math.MaxUint16 tpBufFrameCap = math.MaxUint8 @@ -31,6 +35,43 @@ var ( AcceptBufferSize = 20 ) +// Addr implements net.Addr for dmsg addresses. +type Addr struct { + PK cipher.PubKey `json:"public_key"` + Port uint16 `json:"port"` +} + +// Network returns "dmsg" +func (Addr) Network() string { + return Type +} + +// String returns public key and port of node split by colon. +func (a Addr) String() string { + if a.Port == 0 { + return fmt.Sprintf("%s:~", a.PK) + } + return fmt.Sprintf("%s:%d", a.PK, a.Port) +} + +// HandshakePayload represents format of payload sent with REQUEST frames. +type HandshakePayload struct { + Version string `json:"version"` // just in case the struct changes. + InitAddr Addr `json:"init_address"` + RespAddr Addr `json:"resp_address"` +} + +func marshalHandshakePayload(p HandshakePayload) ([]byte, error) { + return json.Marshal(p) +} + +func unmarshalHandshakePayload(b []byte) (HandshakePayload, error) { + var p HandshakePayload + err := json.Unmarshal(b, &p) + return p, err +} + +// determines whether the transport ID is of an initiator or responder. func isInitiatorID(tpID uint16) bool { return tpID%2 == 0 } func randID(initiator bool) uint16 { @@ -43,6 +84,7 @@ func randID(initiator bool) uint16 { } } +// serveCount records the number of dmsg.Servers connected var serveCount int64 func incrementServeCount() int64 { return atomic.AddInt64(&serveCount, 1) } @@ -76,6 +118,11 @@ const ( AckType = FrameType(0xb) ) +// Reasons for closing frames +const ( + PlaceholderReason = iota +) + // Frame is the dmsg data unit. type Frame []byte @@ -116,24 +163,36 @@ func (f Frame) String() string { return fmt.Sprintf("%s", f.Type(), f.TpID(), f.PayLen(), p) } -func readFrame(r io.Reader) (Frame, error) { - f := make(Frame, headerLen) - if _, err := io.ReadFull(r, f); err != nil { - return nil, err +type disassembledFrame struct { + Type FrameType + TpID uint16 + Pay []byte +} + +// read and disassembles frame from reader +func readFrame(r io.Reader) (f Frame, df disassembledFrame, err error) { + f = make(Frame, headerLen) + if _, err = io.ReadFull(r, f); err != nil { + return } f = append(f, make([]byte, f.PayLen())...) - _, err := io.ReadFull(r, f[headerLen:]) - return f, err + if _, err = io.ReadFull(r, f[headerLen:]); err != nil { + return + } + t, id, p := f.Disassemble() + df = disassembledFrame{Type: t, TpID: id, Pay: p} + return } type writeError struct{ error } func (e *writeError) Error() string { return "write error: " + e.error.Error() } -func isWriteError(err error) bool { - _, ok := err.(*writeError) - return ok -} +// TODO(evanlinjin): Determine if this is still needed, may be useful elsewhere. +//func isWriteError(err error) bool { +// _, ok := err.(*writeError) +// return ok +//} func writeFrame(w io.Writer, f Frame) error { _, err := w.Write(f) diff --git a/vendor/modules.txt b/vendor/modules.txt index b859dcfc63..4ad29bf678 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -64,10 +64,11 @@ github.com/prometheus/procfs/internal/fs # github.com/sirupsen/logrus v1.4.2 github.com/sirupsen/logrus github.com/sirupsen/logrus/hooks/syslog -# github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f +# github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f => ../dmsg github.com/skycoin/dmsg/cipher github.com/skycoin/dmsg github.com/skycoin/dmsg/disc +github.com/skycoin/dmsg/netutil github.com/skycoin/dmsg/noise github.com/skycoin/dmsg/ioutil # github.com/skycoin/skycoin v0.26.0