From b5909ebadb24f19f3f66a83f764fc2fa36b0e997 Mon Sep 17 00:00:00 2001 From: Evan Lin Date: Fri, 16 Aug 2019 14:44:29 +0800 Subject: [PATCH] Fixed managedTransport closing logic and parsing logic in 'network' module. --- pkg/network/network.go | 11 ++++++--- pkg/network/testing.go | 2 +- pkg/setup/node.go | 5 ++-- pkg/transport/managed_transport.go | 1 + pkg/transport/manager.go | 19 +++++++------- pkg/transport/transport.go | 3 ++- pkg/visor/visor.go | 34 ++++++++++++++------------ vendor/github.com/skycoin/dmsg/addr.go | 3 +++ 8 files changed, 45 insertions(+), 33 deletions(-) diff --git a/pkg/network/network.go b/pkg/network/network.go index e8f70cb65..faa9944d6 100644 --- a/pkg/network/network.go +++ b/pkg/network/network.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" - "github.com/skycoin/skycoin/src/util/logging" "net" "strings" "sync" + "github.com/skycoin/skycoin/src/util/logging" + "github.com/skycoin/dmsg" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/dmsg/disc" @@ -49,7 +50,7 @@ type Network struct { func New(conf Config) *Network { dmsgC := dmsg.NewClient(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr), dmsg.SetLogger(logging.MustGetLogger("network.dmsgC"))) return &Network{ - conf: conf, + conf: conf, dmsgC: dmsgC, } } @@ -165,8 +166,10 @@ func disassembleAddr(addr net.Addr) (pk cipher.PubKey, port uint16) { if err := pk.Set(strs[0]); err != nil { panic(fmt.Errorf("network.disassembleAddr: %v %s", err, addr.String())) } - if _, err := fmt.Sscanf(strs[1], "%d", &port); err != nil { - panic(fmt.Errorf("network.disassembleAddr: %v", err)) + if strs[1] != "~" { + if _, err := fmt.Sscanf(strs[1], "%d", &port); err != nil { + panic(fmt.Errorf("network.disassembleAddr: %v", err)) + } } return } diff --git a/pkg/network/testing.go b/pkg/network/testing.go index 852405502..fea0d892a 100644 --- a/pkg/network/testing.go +++ b/pkg/network/testing.go @@ -84,4 +84,4 @@ package network // close(errCh) // }() // return srv, errCh -//} \ No newline at end of file +//} diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 67d7eb990..bead044b8 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -5,9 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "time" + "github.com/skycoin/dmsg" + "github.com/skycoin/skywire/pkg/network" - "time" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/dmsg/disc" @@ -76,7 +78,6 @@ func (sn *Node) Serve(ctx context.Context) error { } sn.Logger.Info("Connected to messaging servers") - sn.Logger.Info("Starting Setup Node") for { diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 9746d4fc1..21c7f85ee 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -125,6 +125,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru } select { case <-done: + return case readCh <- p: } } diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 25cb57542..d0d8444a8 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -37,6 +37,7 @@ type Manager struct { readCh chan routing.Packet mx sync.RWMutex + wg sync.WaitGroup done chan struct{} } @@ -62,7 +63,6 @@ func NewManager(n *network.Network, config *ManagerConfig) (*Manager, error) { // Serve runs listening loop across all registered factories. func (tm *Manager) Serve(ctx context.Context) error { var listeners []*network.Listener - var wg sync.WaitGroup for _, netName := range tm.conf.Networks { lis, err := tm.n.Listen(netName, network.TransportPort) @@ -73,9 +73,9 @@ func (tm *Manager) Serve(ctx context.Context) error { tm.Logger.Infof("listening on network: %s", netName) listeners = append(listeners, lis) - wg.Add(1) + tm.wg.Add(1) go func(netName string) { - defer wg.Done() + defer tm.wg.Done() for { select { case <-ctx.Done(): @@ -108,8 +108,6 @@ func (tm *Manager) Serve(ctx context.Context) error { } } - wg.Wait() - close(tm.readCh) return nil } @@ -275,15 +273,18 @@ func (tm *Manager) Close() error { close(tm.done) - i, statuses := 0, make([]*Status, len(tm.tps)) + statuses := make([]*Status, 0, len(tm.tps)) for _, tr := range tm.tps { - tr.close() - statuses[i] = &Status{ID: tr.Entry.ID, IsUp: false} - i++ + if closed := tr.close(); closed { + statuses = append(statuses[0:], &Status{ID: tr.Entry.ID, IsUp: false}) + } } if _, err := tm.conf.DiscoveryClient.UpdateStatuses(context.Background(), statuses...); err != nil { tm.Logger.Warnf("failed to update transport statuses: %v", err) } + + tm.wg.Wait() + close(tm.readCh) return nil } diff --git a/pkg/transport/transport.go b/pkg/transport/transport.go index 362b157ee..d4af4a0e4 100644 --- a/pkg/transport/transport.go +++ b/pkg/transport/transport.go @@ -4,10 +4,11 @@ package transport import ( "crypto/sha256" + "math/big" + "github.com/google/uuid" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" - "math/big" ) var log = logging.MustGetLogger("transport") diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index 037b6add3..8b42030b1 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -6,9 +6,6 @@ import ( "context" "errors" "fmt" - "github.com/skycoin/dmsg" - "github.com/skycoin/dmsg/cipher" - "github.com/skycoin/skywire/pkg/network" "io" "net" "net/rpc" @@ -22,6 +19,11 @@ import ( "syscall" "time" + "github.com/skycoin/dmsg" + "github.com/skycoin/dmsg/cipher" + + "github.com/skycoin/skywire/pkg/network" + "github.com/skycoin/dmsg/noise" "github.com/skycoin/skycoin/src/util/logging" @@ -86,12 +88,12 @@ type PacketRouter interface { // Node provides messaging runtime for Apps by setting up all // necessary connections and performing messaging gateway functions. type Node struct { - config *Config - router PacketRouter - n *network.Network - tm *transport.Manager - rt routing.Table - executer appExecuter + config *Config + router PacketRouter + n *network.Network + tm *transport.Manager + rt routing.Table + executer appExecuter Logger *logging.MasterLogger logger *logging.Logger @@ -127,11 +129,11 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) fmt.Println("min servers:", config.Messaging.ServerCount) node.n = network.New(network.Config{ - PubKey: pk, - SecKey: sk, - TpNetworks: []string{dmsg.Type}, // TODO: Have some way to configure this. + PubKey: pk, + SecKey: sk, + TpNetworks: []string{dmsg.Type}, // TODO: Have some way to configure this. DmsgDiscAddr: config.Messaging.Discovery, - DmsgMinSrvs: config.Messaging.ServerCount, + DmsgMinSrvs: config.Messaging.ServerCount, }) if err := node.n.Init(ctx); err != nil { return nil, fmt.Errorf("failed to init network: %v", err) @@ -146,10 +148,10 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) return nil, fmt.Errorf("invalid TransportLogStore: %s", err) } tmConfig := &transport.ManagerConfig{ - PubKey: pk, - SecKey: sk, + PubKey: pk, + SecKey: sk, DefaultNodes: config.TrustedNodes, - Networks: []string{dmsg.Type}, // TODO: Have some way to configure this. + Networks: []string{dmsg.Type}, // TODO: Have some way to configure this. DiscoveryClient: trDiscovery, LogStore: logStore, } diff --git a/vendor/github.com/skycoin/dmsg/addr.go b/vendor/github.com/skycoin/dmsg/addr.go index 65e7f71b2..2be739b40 100644 --- a/vendor/github.com/skycoin/dmsg/addr.go +++ b/vendor/github.com/skycoin/dmsg/addr.go @@ -19,5 +19,8 @@ func (Addr) Network() string { // String returns public key and port of node split by colon. func (a Addr) String() string { + if a.Port == 0 { + return fmt.Sprintf("%s:~", a.PK) + } return fmt.Sprintf("%s:%d", a.PK, a.Port) }