Skip to content

Commit

Permalink
Fixed managedTransport closing logic and parsing logic in 'network' m…
Browse files Browse the repository at this point in the history
…odule.
  • Loading branch information
Evan Lin committed Aug 16, 2019
1 parent c0a07a9 commit b5909eb
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 33 deletions.
11 changes: 7 additions & 4 deletions pkg/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
"github.com/skycoin/skycoin/src/util/logging"
"net"
"strings"
"sync"

"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
Expand Down Expand Up @@ -49,7 +50,7 @@ type Network struct {
func New(conf Config) *Network {
dmsgC := dmsg.NewClient(conf.PubKey, conf.SecKey, disc.NewHTTP(conf.DmsgDiscAddr), dmsg.SetLogger(logging.MustGetLogger("network.dmsgC")))
return &Network{
conf: conf,
conf: conf,
dmsgC: dmsgC,
}
}
Expand Down Expand Up @@ -165,8 +166,10 @@ func disassembleAddr(addr net.Addr) (pk cipher.PubKey, port uint16) {
if err := pk.Set(strs[0]); err != nil {
panic(fmt.Errorf("network.disassembleAddr: %v %s", err, addr.String()))
}
if _, err := fmt.Sscanf(strs[1], "%d", &port); err != nil {
panic(fmt.Errorf("network.disassembleAddr: %v", err))
if strs[1] != "~" {
if _, err := fmt.Sscanf(strs[1], "%d", &port); err != nil {
panic(fmt.Errorf("network.disassembleAddr: %v", err))
}
}
return
}
2 changes: 1 addition & 1 deletion pkg/network/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ package network
// close(errCh)
// }()
// return srv, errCh
//}
//}
5 changes: 3 additions & 2 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/skycoin/dmsg"

"github.com/skycoin/skywire/pkg/network"
"time"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/dmsg/disc"
Expand Down Expand Up @@ -76,7 +78,6 @@ func (sn *Node) Serve(ctx context.Context) error {
}
sn.Logger.Info("Connected to messaging servers")


sn.Logger.Info("Starting Setup Node")

for {
Expand Down
1 change: 1 addition & 0 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru
}
select {
case <-done:
return
case readCh <- p:
}
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Manager struct {

readCh chan routing.Packet
mx sync.RWMutex
wg sync.WaitGroup
done chan struct{}
}

Expand All @@ -62,7 +63,6 @@ func NewManager(n *network.Network, config *ManagerConfig) (*Manager, error) {
// Serve runs listening loop across all registered factories.
func (tm *Manager) Serve(ctx context.Context) error {
var listeners []*network.Listener
var wg sync.WaitGroup

for _, netName := range tm.conf.Networks {
lis, err := tm.n.Listen(netName, network.TransportPort)
Expand All @@ -73,9 +73,9 @@ func (tm *Manager) Serve(ctx context.Context) error {
tm.Logger.Infof("listening on network: %s", netName)
listeners = append(listeners, lis)

wg.Add(1)
tm.wg.Add(1)
go func(netName string) {
defer wg.Done()
defer tm.wg.Done()
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -108,8 +108,6 @@ func (tm *Manager) Serve(ctx context.Context) error {
}
}

wg.Wait()
close(tm.readCh)
return nil
}

Expand Down Expand Up @@ -275,15 +273,18 @@ func (tm *Manager) Close() error {

close(tm.done)

i, statuses := 0, make([]*Status, len(tm.tps))
statuses := make([]*Status, 0, len(tm.tps))
for _, tr := range tm.tps {
tr.close()
statuses[i] = &Status{ID: tr.Entry.ID, IsUp: false}
i++
if closed := tr.close(); closed {
statuses = append(statuses[0:], &Status{ID: tr.Entry.ID, IsUp: false})
}
}
if _, err := tm.conf.DiscoveryClient.UpdateStatuses(context.Background(), statuses...); err != nil {
tm.Logger.Warnf("failed to update transport statuses: %v", err)
}

tm.wg.Wait()
close(tm.readCh)
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ package transport

import (
"crypto/sha256"
"math/big"

"github.com/google/uuid"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"
"math/big"
)

var log = logging.MustGetLogger("transport")
Expand Down
34 changes: 18 additions & 16 deletions pkg/visor/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ import (
"context"
"errors"
"fmt"
"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skywire/pkg/network"
"io"
"net"
"net/rpc"
Expand All @@ -22,6 +19,11 @@ import (
"syscall"
"time"

"github.com/skycoin/dmsg"
"github.com/skycoin/dmsg/cipher"

"github.com/skycoin/skywire/pkg/network"

"github.com/skycoin/dmsg/noise"
"github.com/skycoin/skycoin/src/util/logging"

Expand Down Expand Up @@ -86,12 +88,12 @@ type PacketRouter interface {
// Node provides messaging runtime for Apps by setting up all
// necessary connections and performing messaging gateway functions.
type Node struct {
config *Config
router PacketRouter
n *network.Network
tm *transport.Manager
rt routing.Table
executer appExecuter
config *Config
router PacketRouter
n *network.Network
tm *transport.Manager
rt routing.Table
executer appExecuter

Logger *logging.MasterLogger
logger *logging.Logger
Expand Down Expand Up @@ -127,11 +129,11 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)

fmt.Println("min servers:", config.Messaging.ServerCount)
node.n = network.New(network.Config{
PubKey: pk,
SecKey: sk,
TpNetworks: []string{dmsg.Type}, // TODO: Have some way to configure this.
PubKey: pk,
SecKey: sk,
TpNetworks: []string{dmsg.Type}, // TODO: Have some way to configure this.
DmsgDiscAddr: config.Messaging.Discovery,
DmsgMinSrvs: config.Messaging.ServerCount,
DmsgMinSrvs: config.Messaging.ServerCount,
})
if err := node.n.Init(ctx); err != nil {
return nil, fmt.Errorf("failed to init network: %v", err)
Expand All @@ -146,10 +148,10 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
return nil, fmt.Errorf("invalid TransportLogStore: %s", err)
}
tmConfig := &transport.ManagerConfig{
PubKey: pk,
SecKey: sk,
PubKey: pk,
SecKey: sk,
DefaultNodes: config.TrustedNodes,
Networks: []string{dmsg.Type}, // TODO: Have some way to configure this.
Networks: []string{dmsg.Type}, // TODO: Have some way to configure this.
DiscoveryClient: trDiscovery,
LogStore: logStore,
}
Expand Down
3 changes: 3 additions & 0 deletions vendor/github.com/skycoin/dmsg/addr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b5909eb

Please sign in to comment.