Skip to content

Commit

Permalink
Attempt to fix broken code
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jul 8, 2019
1 parent 7352b58 commit 3b67aa1
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 132 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error)
LogStore: logStore,
DefaultNodes: config.TrustedNodes,
}
node.tm, err = transport.NewManager(tmConfig, node.messenger)
node.tm, err = transport.NewManager(tmConfig, config.Routing.SetupNodes, node.messenger)
if err != nil {
return nil, fmt.Errorf("transport manager: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestNodeStartClose(t *testing.T) {
var err error

tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()}
node.tm, err = transport.NewManager(tmConf, node.messenger)
node.tm, err = transport.NewManager(tmConf, nil, node.messenger)
require.NoError(t, err)

errCh := make(chan error)
Expand Down
32 changes: 16 additions & 16 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ func TestRouterForwarding(t *testing.T) {
f3.SetType("mock2")
f4.SetType("mock2")

m1, err := transport.NewManager(c1, f1)
m1, err := transport.NewManager(c1, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(c2, f2, f3)
m2, err := transport.NewManager(c2, nil, f2, f3)
require.NoError(t, err)

m3, err := transport.NewManager(c3, f4)
m3, err := transport.NewManager(c3, nil, f4)
require.NoError(t, err)

rt := routing.InMemoryRoutingTable()
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestRouterAppInit(t *testing.T) {
pk1, sk1 := cipher.GenerateKeyPair()
c1 := &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}

m1, err := transport.NewManager(c1)
m1, err := transport.NewManager(c1, nil)
require.NoError(t, err)

conf := &Config{
Expand Down Expand Up @@ -154,10 +154,10 @@ func TestRouterApp(t *testing.T) {
c2 := &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}

f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
m1, err := transport.NewManager(c1, f1)
m1, err := transport.NewManager(c1, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(c2, f2)
m2, err := transport.NewManager(c2, nil, f2)
require.NoError(t, err)

go m2.Serve(context.TODO()) // nolint
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestRouterLocalApp(t *testing.T) {
logStore := transport.InMemoryTransportLogStore()

pk, sk := cipher.GenerateKeyPair()
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore})
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil)
require.NoError(t, err)

conf := &Config{
Expand Down Expand Up @@ -285,10 +285,10 @@ func TestRouterSetup(t *testing.T) {
c2 := &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}

f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
m1, err := transport.NewManager(c1, f1)
m1, err := transport.NewManager(c1, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(c2, f2)
m2, err := transport.NewManager(c2, nil, f2)
require.NoError(t, err)

rt := routing.InMemoryRoutingTable()
Expand Down Expand Up @@ -433,10 +433,10 @@ func TestRouterSetupLoop(t *testing.T) {
f1.SetType(dmsg.Type)
f2.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, nil, f2)
require.NoError(t, err)
go m2.Serve(context.TODO()) // nolint: errcheck

Expand Down Expand Up @@ -538,10 +538,10 @@ func TestRouterCloseLoop(t *testing.T) {
f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
f1.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, nil, f2)
require.NoError(t, err)
go m2.Serve(context.TODO()) // nolint: errcheck

Expand Down Expand Up @@ -632,10 +632,10 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) {
f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
f1.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, nil, f1)
require.NoError(t, err)

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, nil, f2)
require.NoError(t, err)
go m2.Serve(context.TODO()) // nolint: errcheck

Expand Down Expand Up @@ -716,7 +716,7 @@ func TestRouterRouteExpiration(t *testing.T) {
logStore := transport.InMemoryTransportLogStore()

pk, sk := cipher.GenerateKeyPair()
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore})
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil)
require.NoError(t, err)

rt := routing.InMemoryRoutingTable()
Expand Down
98 changes: 0 additions & 98 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package setup

import (
"context"
"encoding/json"
"errors"
"log"
"os"
"sync"
"testing"
"time"

Expand All @@ -19,7 +17,6 @@ import (

"github.com/skycoin/skywire/pkg/metrics"
"github.com/skycoin/skywire/pkg/routing"
"github.com/skycoin/skywire/pkg/transport"
"github.com/skycoin/skywire/pkg/transport/dmsg"
)

Expand Down Expand Up @@ -141,101 +138,6 @@ func TestCloseLoop(t *testing.T) {
require.NoError(t, errWithTimeout(srvErrCh))
}

type mockNode struct {
sync.Mutex
rules map[routing.RouteID]routing.Rule
messenger *dmsg.Client
}

func newMockNode(messenger *dmsg.Client) *mockNode {
return &mockNode{messenger: messenger, rules: make(map[routing.RouteID]routing.Rule)}
}

func (n *mockNode) serve() {
ctx := context.Background()

for {
tp, _ := n.messenger.Accept(ctx) // nolint:errcheck
go func(tp transport.Transport) {
for {
n.serveTransport(tp) // nolint:errcheck
}
}(tp)
}
}

func (n *mockNode) setRule(id routing.RouteID, rule routing.Rule) {
n.Lock()
n.rules[id] = rule
n.Unlock()
}

func (n *mockNode) getRules() map[routing.RouteID]routing.Rule {
res := make(map[routing.RouteID]routing.Rule)
n.Lock()
for id, rule := range n.rules {
res[id] = rule
}
n.Unlock()
return res
}

func (n *mockNode) serveTransport(tr transport.Transport) error {
proto := NewSetupProtocol(tr)
sp, data, err := proto.ReadPacket()
if err != nil {
return err
}

n.Lock()
var res interface{}
switch sp {
case PacketAddRules:
rules := []routing.Rule{}
json.Unmarshal(data, &rules) // nolint: errcheck
for _, rule := range rules {
for i := routing.RouteID(1); i < 255; i++ {
if n.rules[i] == nil {
n.rules[i] = rule
res = []routing.RouteID{i}
break
}
}
}
case PacketConfirmLoop:
ld := LoopData{}
json.Unmarshal(data, &ld) // nolint: errcheck
for _, rule := range n.rules {
if rule.Type() == routing.RuleApp && rule.RemotePK() == ld.RemotePK &&
rule.RemotePort() == ld.RemotePort && rule.LocalPort() == ld.LocalPort {

rule.SetRouteID(ld.RouteID)
break
}
}
case PacketLoopClosed:
ld := &LoopData{}
json.Unmarshal(data, ld) // nolint: errcheck
for routeID, rule := range n.rules {
if rule.Type() == routing.RuleApp && rule.RemotePK() == ld.RemotePK &&
rule.RemotePort() == ld.RemotePort && rule.LocalPort() == ld.LocalPort {

delete(n.rules, routeID)
break
}
}
default:
err = errors.New("unknown foundation packet")
}
n.Unlock()

if err != nil {
return proto.WritePacket(RespFailure, err)
}

return proto.WritePacket(RespSuccess, res)
}

func createServer(dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error, err error) {
pk, sk := cipher.GenerateKeyPair()

Expand Down
2 changes: 2 additions & 0 deletions pkg/transport/dmsg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/skycoin/dmsg/disc"
)

// Server is an alias for dmsg.Server
type Server = dmsg.Server

// NewServer is an alias for dmsg.NewServer
func NewServer(pk cipher.PubKey, sk cipher.SecKey, addr string, l net.Listener, dc disc.APIClient) (*Server, error) {
return dmsg.NewServer(pk, sk, addr, l, dc)
}
4 changes: 2 additions & 2 deletions pkg/transport/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func newHsMockEnv() *hsMockEnv {
tr1 := NewMockTransport(in, pk1, pk2)
tr2 := NewMockTransport(out, pk2, pk1)

m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client})
m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client})
m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client}, nil)
m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client}, nil)

return &hsMockEnv{
client: client,
Expand Down
30 changes: 25 additions & 5 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ type Manager struct {
mu sync.RWMutex

mgrQty int32 // Count of spawned manageTransport goroutines

setupNodes []cipher.PubKey
}

// NewManager creates a Manager with the provided configuration and transport factories.
// 'factories' should be ordered by preference.
func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
func NewManager(config *ManagerConfig, setupNodes []cipher.PubKey, factories ...Factory) (*Manager, error) {
entries, _ := config.DiscoveryClient.GetTransportsByEdge(context.Background(), config.PubKey) // nolint

mEntries := make(map[Entry]struct{})
Expand All @@ -62,6 +64,7 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
entries: mEntries,
TrChan: make(chan *ManagedTransport, 9), // TODO: eliminate or justify buffering here
doneChan: make(chan struct{}),
setupNodes: setupNodes,
}, nil
}

Expand Down Expand Up @@ -264,10 +267,15 @@ func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote ci
return nil, nil, err
}

entry, err := settlementInitiatorHandshake(public).Do(tm, tr, time.Minute)
if err != nil {
tr.Close()
return nil, nil, err
var entry *Entry
if tm.IsSetupTransport(tr) {
entry = makeEntry(tr, public)
} else {
entry, err = settlementInitiatorHandshake(public).Do(tm, tr, time.Minute)
if err != nil {
tr.Close()
return nil, nil, err
}
}

return tr, entry, nil
Expand Down Expand Up @@ -437,3 +445,15 @@ func (tm *Manager) manageTransport(ctx context.Context, mTr *ManagedTransport, f
}
}
}

// IsSetupTransport checks whether `tr` is running in the `setup` mode.
func (tm *Manager) IsSetupTransport(tr Transport) bool {
for _, pk := range tm.setupNodes {
remote, ok := tm.Remote(tr.Edges())
if ok && (remote == pk) {
return true
}
}

return false
}
14 changes: 7 additions & 7 deletions pkg/transport/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestTransportManager(t *testing.T) {
c2 := &ManagerConfig{pk2, sk2, client, logStore, nil}

f1, f2 := NewMockFactoryPair(pk1, pk2)
m1, err := NewManager(c1, f1)
m1, err := NewManager(c1, nil, f1)
require.NoError(t, err)

assert.Equal(t, []string{"mock"}, m1.Factories())
Expand All @@ -53,7 +53,7 @@ func TestTransportManager(t *testing.T) {
errCh <- m1.Serve(context.TODO())
}()

m2, err := NewManager(c2, f2)
m2, err := NewManager(c2, nil, f2)
require.NoError(t, err)

var mu sync.Mutex
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestTransportManagerReEstablishTransports(t *testing.T) {
c2 := &ManagerConfig{pk2, sk2, client, logStore, nil}

f1, f2 := NewMockFactoryPair(pk1, pk2)
m1, err := NewManager(c1, f1)
m1, err := NewManager(c1, nil, f1)
require.NoError(t, err)
assert.Equal(t, []string{"mock"}, m1.Factories())

Expand All @@ -147,7 +147,7 @@ func TestTransportManagerReEstablishTransports(t *testing.T) {
m1errCh := make(chan error, 1)
go func() { m1errCh <- m1.Serve(context.TODO()) }()

m2, err := NewManager(c2, f2)
m2, err := NewManager(c2, nil, f2)
require.NoError(t, err)

tr2, err := m2.CreateTransport(context.TODO(), pk1, "mock", true)
Expand All @@ -167,7 +167,7 @@ func TestTransportManagerReEstablishTransports(t *testing.T) {
require.NoError(t, err)
assert.False(t, dEntry2.IsUp)

m2, err = NewManager(c2, f2)
m2, err = NewManager(c2, nil, f2)
require.NoError(t, err)

m2.reconnectTransports(context.TODO())
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestTransportManagerLogs(t *testing.T) {
c2 := &ManagerConfig{pk2, sk2, client, logStore2, nil}

f1, f2 := NewMockFactoryPair(pk1, pk2)
m1, err := NewManager(c1, f1)
m1, err := NewManager(c1, nil, f1)
require.NoError(t, err)

assert.Equal(t, []string{"mock"}, m1.Factories())
Expand All @@ -210,7 +210,7 @@ func TestTransportManagerLogs(t *testing.T) {
errCh <- m1.Serve(context.TODO())
}()

m2, err := NewManager(c2, f2)
m2, err := NewManager(c2, nil, f2)
require.NoError(t, err)

tr2, err := m2.CreateTransport(context.TODO(), pk1, "mock", true)
Expand Down
Loading

0 comments on commit 3b67aa1

Please sign in to comment.