diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 2475c6615..766c5192e 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -38,7 +38,7 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -// TODO(evanlinjin): Fix this test. +// TODO(evanlinjin): Fix test. //func TestRouterForwarding(t *testing.T) { // client := transport.NewDiscoveryMock() // logStore := transport.InMemoryTransportLogStore() @@ -56,17 +56,17 @@ func TestMain(m *testing.M) { // f3.SetType("mock2") // f4.SetType("mock2") // -// m1, err := transport.NewManager(c1, f1) +// m1, err := transport.NewManager(c1, nil, f1) // require.NoError(t, err) -// go func() { _ = m1.Serve(context.TODO()) }() +// go func() { _ = m1.Serve(context.TODO()) }() //nolint:errcheck // -// m2, err := transport.NewManager(c2, f2, f3) +// m2, err := transport.NewManager(c2, nil, f2, f3) // require.NoError(t, err) -// go func() { _ = m2.Serve(context.TODO()) }() +// go func() { _ = m2.Serve(context.TODO()) }() //nolint:errcheck // -// m3, err := transport.NewManager(c3, f4) +// m3, err := transport.NewManager(c3, nil, f4) // require.NoError(t, err) -// go func() { _ = m3.Serve(context.TODO()) }() +// go func() { _ = m3.Serve(context.TODO()) }() //nolint:errcheck // // rt := routing.InMemoryRoutingTable() // conf := &Config{ @@ -118,8 +118,9 @@ func TestRouterAppInit(t *testing.T) { pk1, sk1 := cipher.GenerateKeyPair() c1 := &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore} - m1, err := transport.NewManager(c1) + m1, err := transport.NewManager(c1, nil) require.NoError(t, err) + go func() { _ = m1.Serve(context.TODO()) }() //nolint:errcheck conf := &Config{ Logger: logging.MustGetLogger("routesetup"), @@ -129,9 +130,10 @@ func TestRouterAppInit(t *testing.T) { } r := New(conf) rw, rwIn := net.Pipe() - errCh := make(chan error) + errCh := make(chan error, 1) go func() { errCh <- r.ServeApp(rwIn, 10, &app.Config{AppName: "foo", AppVersion: "0.0.1"}) + close(errCh) }() proto := app.NewProtocol(rw) @@ -149,7 +151,7 @@ func TestRouterAppInit(t *testing.T) { require.NoError(t, <-errCh) } -// TODO(evanlinjin): Fix this test. +// TODO(evanlinjin): Figure out what this is testing and fix it. //func TestRouterApp(t *testing.T) { // client := transport.NewDiscoveryMock() // logStore := transport.InMemoryTransportLogStore() @@ -162,11 +164,11 @@ func TestRouterAppInit(t *testing.T) { // // f1, f2 := transport.NewMockFactoryPair(pk1, pk2) // -// m1, err := transport.NewManager(c1, f1) +// m1, err := transport.NewManager(c1, nil, f1) // require.NoError(t, err) -// go func() {_ = m1.Serve(context.TODO())}() +// //go func() {_ = m1.Serve(context.TODO())}() // -// m2, err := transport.NewManager(c2, f2) +// m2, err := transport.NewManager(c2, nil, f2) // require.NoError(t, err) // go func() {_ = m2.Serve(context.TODO())}() // @@ -253,7 +255,7 @@ func TestRouterLocalApp(t *testing.T) { logStore := transport.InMemoryTransportLogStore() pk, sk := cipher.GenerateKeyPair() - m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}) + m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil) require.NoError(t, err) conf := &Config{ @@ -335,13 +337,12 @@ func TestRouterSetup(t *testing.T) { c2 := &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore} f1, f2 := transport.NewMockFactoryPair(pk1, pk2) - m1, err := transport.NewManager(c1, f1) + m1, err := transport.NewManager(c1, []cipher.PubKey{pk2}, f1) require.NoError(t, err) - m2, err := transport.NewManager(c2, f2) + m2, err := transport.NewManager(c2, nil, f2) require.NoError(t, err) - - m1.SetSetupPKs([]cipher.PubKey{pk2}) + go func() { _ = m2.Serve(context.TODO()) }() //nolint:errcheck rt := routing.InMemoryRoutingTable() conf := &Config{ @@ -537,17 +538,22 @@ func TestRouterSetupLoop(t *testing.T) { f1.SetType(dmsg.Type) f2.SetType(dmsg.Type) - m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1) + m1, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk2}, + f1) require.NoError(t, err) - m1.SetSetupPKs([]cipher.PubKey{pk2}) - m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2) + m2, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk1}, + f2) require.NoError(t, err) - m2.SetSetupPKs([]cipher.PubKey{pk1}) serveErrCh := make(chan error, 1) go func() { serveErrCh <- m2.Serve(context.TODO()) + close(serveErrCh) }() conf := &Config{ @@ -666,13 +672,17 @@ func TestRouterCloseLoop(t *testing.T) { f1, f2 := transport.NewMockFactoryPair(pk1, pk2) f1.SetType(dmsg.Type) - m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1) + m1, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk2}, + f1) require.NoError(t, err) - m1.SetSetupPKs([]cipher.PubKey{pk2}) - m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2) + m2, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk1}, + f2) require.NoError(t, err) - m2.SetSetupPKs([]cipher.PubKey{pk1}) serveErrCh := make(chan error, 1) go func() { @@ -773,13 +783,17 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) { f1, f2 := transport.NewMockFactoryPair(pk1, pk2) f1.SetType(dmsg.Type) - m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1) + m1, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk2}, + f1) require.NoError(t, err) - m1.SetSetupPKs([]cipher.PubKey{pk2}) - m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2) + m2, err := transport.NewManager( + &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, + []cipher.PubKey{pk1}, + f2) require.NoError(t, err) - m2.SetSetupPKs([]cipher.PubKey{pk1}) serveErrCh := make(chan error, 1) go func() { @@ -872,8 +886,9 @@ func TestRouterRouteExpiration(t *testing.T) { logStore := transport.InMemoryTransportLogStore() pk, sk := cipher.GenerateKeyPair() - m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}) + m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil) require.NoError(t, err) + go func() { _ = m.Serve(context.TODO()) }() //nolint:errcheck rt := routing.InMemoryRoutingTable() _, err = rt.AddRule(routing.AppRule(time.Now().Add(-time.Hour), 4, pk, 6, 5)) diff --git a/pkg/transport/handshake_test.go b/pkg/transport/handshake_test.go index e442ac1e0..134346224 100644 --- a/pkg/transport/handshake_test.go +++ b/pkg/transport/handshake_test.go @@ -34,8 +34,8 @@ func newHsMockEnv() *hsMockEnv { tr1 := NewMockTransport(in, pk1, pk2) tr2 := NewMockTransport(out, pk2, pk1) - m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client}) - m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client}) + m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client}, nil) + m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client}, nil) return &hsMockEnv{ client: client, diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 84654371a..9f3bc22bb 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -247,10 +247,16 @@ func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn Transport) er return ErrConnAlreadyExists } - if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil { - mt.log.Warnf("Failed to update transport status: %s", err) + var err error + for i := 0; i < 3; i++ { + if _, err = mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil { + mt.log.Warnf("Failed to update transport status: %s, retrying...", err) + continue + } + mt.log.Infoln("Status updated: UP") + break } - mt.log.Infoln("Status updated: UP") + mt.conn = conn select { case mt.connCh <- struct{}{}: @@ -260,11 +266,14 @@ func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn Transport) er } func (mt *ManagedTransport) clearConn(ctx context.Context) { + if mt.conn != nil { + _ = mt.conn.Close() //nolint:errcheck + mt.conn = nil + } if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil { mt.log.Warnf("Failed to update transport status: %s", err) } mt.log.Infoln("Status updated: DOWN") - mt.conn = nil } // WritePacket writes a packet to the remote. diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index b06d115e1..f63311074 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -39,50 +39,28 @@ type Manager struct { // NewManager creates a Manager with the provided configuration and transport factories. // 'factories' should be ordered by preference. -func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) { - log := logging.MustGetLogger("tp_manager") - ctx := context.Background() - - done := make(chan struct{}) - - fMap := make(map[string]Factory) - for _, factory := range factories { - fMap[factory.Type()] = factory - } - - entries, err := config.DiscoveryClient.GetTransportsByEdge(ctx, config.PubKey) - if err != nil { - log.Warnf("No transports found for local node: %v", err) +func NewManager(config *ManagerConfig, setupPKs []cipher.PubKey, factories ...Factory) (*Manager, error) { + tm := &Manager{ + Logger: logging.MustGetLogger("tp_manager"), + conf: config, + setupPKS: setupPKs, + facs: make(map[string]Factory), + tps: make(map[uuid.UUID]*ManagedTransport), + setupCh: make(chan Transport, 9), // TODO: eliminate or justify buffering here + readCh: make(chan routing.Packet, 20), + done: make(chan struct{}), } - - rCh := make(chan routing.Packet, 20) - tpMap := make(map[uuid.UUID]*ManagedTransport) - for _, entry := range entries { - fac, ok := fMap[entry.Entry.Type] - if !ok { - log.Warnf("cannot revive transport entry: factory of type '%s' not supported", entry.Entry.Type) - continue - } - mTp := NewManagedTransport(fac, config.DiscoveryClient, config.LogStore, entry.Entry.RemoteEdge(config.PubKey), config.SecKey) - go mTp.Serve(rCh, done) - tpMap[entry.Entry.ID] = mTp + for _, factory := range factories { + tm.facs[factory.Type()] = factory } - - return &Manager{ - Logger: log, - conf: config, - facs: fMap, - tps: tpMap, - setupCh: make(chan Transport, 9), // TODO: eliminate or justify buffering here - readCh: rCh, - done: done, - }, nil + return tm, nil } // Serve runs listening loop across all registered factories. func (tm *Manager) Serve(ctx context.Context) error { - tm.initDefaultTransports(ctx) - tm.Logger.Infof("Default transports created.") + tm.mx.Lock() + tm.initTransports(ctx) + tm.mx.Unlock() var wg sync.WaitGroup for _, factory := range tm.facs { @@ -112,24 +90,19 @@ func (tm *Manager) Serve(ctx context.Context) error { return nil } -// initDefaultTransports created transports to DefaultNodes if they don't exist. -func (tm *Manager) initDefaultTransports(ctx context.Context) { - for _, pk := range tm.conf.DefaultNodes { - pk := pk - exist := false - tm.WalkTransports(func(tr *ManagedTransport) bool { - if tr.Remote() == pk { - exist = true - return false - } - return true - }) - if exist { - continue - } - _, err := tm.SaveTransport(ctx, pk, "dmsg") - if err != nil { - tm.Logger.Warnf("Failed to establish transport to a node %s: %s", pk, err) +func (tm *Manager) initTransports(ctx context.Context) { + entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) + if err != nil { + log.Warnf("No transports found for local node: %v", err) + } + for _, entry := range entries { + var ( + tpType = entry.Entry.Type + remote = entry.Entry.RemoteEdge(tm.conf.PubKey) + tpID = entry.Entry.ID + ) + if _, err := tm.saveTransport(remote, tpType); err != nil { + tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) } } } @@ -181,7 +154,17 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy if tm.isClosing() { return nil, io.ErrClosedPipe } + mTp, err := tm.saveTransport(remote, tpType) + if err != nil { + return nil, err + } + if err := mTp.Dial(ctx); err != nil { + tm.Logger.Warnf("underlying 'write' tp failed, will retry: %v", err) + } + return mTp, nil +} +func (tm *Manager) saveTransport(remote cipher.PubKey, tpType string) (*ManagedTransport, error) { factory, ok := tm.facs[tpType] if !ok { return nil, errors.New("unknown transport type") @@ -195,9 +178,6 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy } mTp := NewManagedTransport(factory, tm.conf.DiscoveryClient, tm.conf.LogStore, remote, tm.conf.SecKey) - if err := mTp.Dial(ctx); err != nil { - tm.Logger.Warnf("underlying 'write' tp failed, will retry: %v", err) - } go mTp.Serve(tm.readCh, tm.done) tm.tps[tpID] = mTp @@ -235,10 +215,7 @@ func (tm *Manager) ReadPacket() (routing.Packet, error) { // SetupPKs returns setup node list contained within the TransportManager. func (tm *Manager) SetupPKs() []cipher.PubKey { - tm.mx.RLock() - pks := tm.setupPKS - tm.mx.RUnlock() - return pks + return tm.setupPKS } // IsSetupPK checks whether provided `pk` is of `setup` purpose. @@ -251,13 +228,6 @@ func (tm *Manager) IsSetupPK(pk cipher.PubKey) bool { return false } -// SetSetupPKs sets setup node list contained within the TransportManager. -func (tm *Manager) SetSetupPKs(nodes []cipher.PubKey) { - tm.mx.Lock() - tm.setupPKS = nodes - tm.mx.Unlock() -} - // DialSetupConn dials to a remote setup node. func (tm *Manager) DialSetupConn(ctx context.Context, remote cipher.PubKey, tpType string) (Transport, error) { tm.mx.Lock() diff --git a/pkg/transport/manager_test.go b/pkg/transport/manager_test.go index 2275bff98..0d01dc90d 100644 --- a/pkg/transport/manager_test.go +++ b/pkg/transport/manager_test.go @@ -47,7 +47,7 @@ func TestNewManager(t *testing.T) { ls1 := transport.InMemoryTransportLogStore() c1 := &transport.ManagerConfig{pk1, sk1, tpDisc, ls1, nil} f1 := dmsgEnv.Clients[0] - m1, err := transport.NewManager(c1, f1) + m1, err := transport.NewManager(c1, nil, f1) require.NoError(t, err) m1Err := make(chan error, 1) go func() { @@ -65,7 +65,7 @@ func TestNewManager(t *testing.T) { ls2 := transport.InMemoryTransportLogStore() c2 := &transport.ManagerConfig{pk2, sk2, tpDisc, ls2, nil} f2 := dmsgEnv.Clients[1] - m2, err := transport.NewManager(c2, f2) + m2, err := transport.NewManager(c2, nil, f2) require.NoError(t, err) m2Err := make(chan error, 1) go func() { diff --git a/pkg/transport/mock.go b/pkg/transport/mock.go index e9598e781..f57360629 100644 --- a/pkg/transport/mock.go +++ b/pkg/transport/mock.go @@ -50,12 +50,14 @@ func (f *MockFactory) SetType(fType string) { func (f *MockFactory) Accept(ctx context.Context) (Transport, error) { select { case conn, ok := <-f.in: - if ok { - return NewMockTransport(conn, f.local, conn.PubKey), nil + if !ok { + return nil, errors.New("factory: closed") } + return NewMockTransport(conn, f.local, conn.PubKey), nil + case <-f.inDone: + return nil, errors.New("factory: closed") } - return nil, errors.New("factory: closed") } // Dial creates pair of net.Conn via net.Pipe and passes one end to another MockFactory. @@ -177,10 +179,10 @@ func MockTransportManagersPair() (pk1, pk2 cipher.PubKey, m1, m2 *Manager, errCh f1, f2 := NewMockFactoryPair(pk1, pk2) - if m1, err = NewManager(c1, f1); err != nil { + if m1, err = NewManager(c1, nil, f1); err != nil { return } - if m2, err = NewManager(c2, f2); err != nil { + if m2, err = NewManager(c2, nil, f2); err != nil { return } diff --git a/pkg/visor/visor.go b/pkg/visor/visor.go index e6c3ee909..5806fd810 100644 --- a/pkg/visor/visor.go +++ b/pkg/visor/visor.go @@ -141,12 +141,10 @@ func NewNode(config *Config, masterLogger *logging.MasterLogger) (*Node, error) LogStore: logStore, DefaultNodes: config.TrustedNodes, } - node.tm, err = transport.NewManager(tmConfig, node.messenger) + node.tm, err = transport.NewManager(tmConfig, config.Routing.SetupNodes, node.messenger) if err != nil { return nil, fmt.Errorf("transport manager: %s", err) } - node.tm.Logger = node.Logger.PackageLogger("trmanager") - node.tm.SetSetupPKs(config.Routing.SetupNodes) node.rt, err = config.RoutingTable() if err != nil { diff --git a/pkg/visor/visor_test.go b/pkg/visor/visor_test.go index a712e4fcb..f9a76c98a 100644 --- a/pkg/visor/visor_test.go +++ b/pkg/visor/visor_test.go @@ -98,7 +98,7 @@ func TestNodeStartClose(t *testing.T) { var err error tmConf := &transport.ManagerConfig{PubKey: cipher.PubKey{}, DiscoveryClient: transport.NewDiscoveryMock()} - node.tm, err = transport.NewManager(tmConf, node.messenger) + node.tm, err = transport.NewManager(tmConf, nil, node.messenger) require.NoError(t, err) errCh := make(chan error)