diff --git a/go.sum b/go.sum index 0085d97472..afc2caf241 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,7 @@ github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/ github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/yamux v0.0.0-20190923154419-df201c70410d/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -171,6 +172,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191206204035-259af5ff87bd/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/app/appnet/addr.go b/pkg/app/appnet/addr.go index 3f02b74443..ba6ddfc493 100644 --- a/pkg/app/appnet/addr.go +++ b/pkg/app/appnet/addr.go @@ -34,6 +34,7 @@ func (a Addr) String() string { if a.Port == 0 { return fmt.Sprintf("%s:~", a.PubKey) } + return fmt.Sprintf("%s:%d", a.PubKey, a.Port) } diff --git a/pkg/app/appnet/networker.go b/pkg/app/appnet/networker.go index 62377441e0..1ffd638979 100644 --- a/pkg/app/appnet/networker.go +++ b/pkg/app/appnet/networker.go @@ -38,12 +38,15 @@ func AddNetworker(t Type, n Networker) error { // ResolveNetworker resolves Networker by `network`. func ResolveNetworker(t Type) (Networker, error) { networkersMx.RLock() + n, ok := networkers[t] if !ok { networkersMx.RUnlock() return nil, ErrNoSuchNetworker } + networkersMx.RUnlock() + return n, nil } diff --git a/pkg/app/appnet/networker_test.go b/pkg/app/appnet/networker_test.go index 0254feb82d..f6b06caeb4 100644 --- a/pkg/app/appnet/networker_test.go +++ b/pkg/app/appnet/networker_test.go @@ -15,6 +15,7 @@ func TestAddNetworker(t *testing.T) { ClearNetworkers() nType := TypeDMSG + var n Networker err := AddNetworker(nType, n) @@ -28,6 +29,7 @@ func TestResolveNetworker(t *testing.T) { ClearNetworkers() nType := TypeDMSG + var n Networker n, err := ResolveNetworker(nType) diff --git a/pkg/app/appnet/skywire_networker.go b/pkg/app/appnet/skywire_networker.go index d7902fab0e..afb89f2d1a 100644 --- a/pkg/app/appnet/skywire_networker.go +++ b/pkg/app/appnet/skywire_networker.go @@ -3,6 +3,7 @@ package appnet import ( "context" "errors" + "io" "net" "sync" "sync/atomic" @@ -79,7 +80,7 @@ func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Li if atomic.CompareAndSwapInt32(&r.isServing, 0, 1) { go func() { - if err := r.serve(ctx); err != nil { + if err := r.serveLoop(ctx); err != nil { r.log.WithError(err).Error("error serving") } }() @@ -88,10 +89,11 @@ func (r *SkywireNetworker) ListenContext(ctx context.Context, addr Addr) (net.Li return lis, nil } -// serve accepts and serves routes. -func (r *SkywireNetworker) serve(ctx context.Context) error { +// serveLoop accepts and serves routes. +func (r *SkywireNetworker) serveLoop(ctx context.Context) error { for { r.log.Infoln("Trying to accept routing group...") + rg, err := r.r.AcceptRoutes(ctx) if err != nil { r.log.Infof("Error accepting routing group: %v", err) @@ -100,39 +102,42 @@ func (r *SkywireNetworker) serve(ctx context.Context) error { r.log.Infoln("Accepted routing group") - go r.serveRG(rg) + go r.serve(rg) } } // serveRG passes accepted router group to the corresponding listener. -func (r *SkywireNetworker) serveRG(rg *router.RouteGroup) { - localAddr, ok := rg.LocalAddr().(routing.Addr) +func (r *SkywireNetworker) serve(conn net.Conn) { + localAddr, ok := conn.LocalAddr().(routing.Addr) if !ok { - r.closeRG(rg) + r.close(conn) r.log.Error("wrong type of addr in accepted conn") + return } lisIfc, ok := r.porter.PortValue(uint16(localAddr.Port)) if !ok { - r.closeRG(rg) + r.close(conn) r.log.Errorf("no listener on port %d", localAddr.Port) + return } lis, ok := lisIfc.(*skywireListener) if !ok { - r.closeRG(rg) + r.close(conn) r.log.Errorf("wrong type of listener on port %d", localAddr.Port) + return } - lis.putConn(rg) + lis.putConn(conn) } // closeRG closes router group and logs error if any. -func (r *SkywireNetworker) closeRG(rg *router.RouteGroup) { - if err := rg.Close(); err != nil { +func (r *SkywireNetworker) close(closer io.Closer) { + if err := closer.Close(); err != nil { r.log.Error(err) } } @@ -191,6 +196,7 @@ type skywireConn struct { // Close closes connection. func (c *skywireConn) Close() error { var err error + c.once.Do(func() { defer func() { c.freePortMx.RLock() diff --git a/pkg/app/appnet/type_test.go b/pkg/app/appnet/type_test.go index eed295f983..66a94311bb 100644 --- a/pkg/app/appnet/type_test.go +++ b/pkg/app/appnet/type_test.go @@ -24,6 +24,7 @@ func TestType_IsValid(t *testing.T) { } for _, tc := range tt { + tc := tc t.Run(tc.name, func(t *testing.T) { valid := tc.t.IsValid() require.Equal(t, tc.want, valid)