Skip to content

Commit

Permalink
Further fixes for transport management.
Browse files Browse the repository at this point in the history
* Update status retries multiple times on failure (this is a workaround for a possible race condition situation in transport discovery).

* transport.NewManager() now takes in setupPKs instead of having a separate function to set setupPKs.

* Various fixes to initTransports logic.

* Fixed various tests.
  • Loading branch information
Evan Lin committed Aug 14, 2019
1 parent df51aa9 commit 8b70a95
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 118 deletions.
77 changes: 46 additions & 31 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

// TODO(evanlinjin): Fix this test.
// TODO(evanlinjin): Fix test.
//func TestRouterForwarding(t *testing.T) {
// client := transport.NewDiscoveryMock()
// logStore := transport.InMemoryTransportLogStore()
Expand All @@ -56,17 +56,17 @@ func TestMain(m *testing.M) {
// f3.SetType("mock2")
// f4.SetType("mock2")
//
// m1, err := transport.NewManager(c1, f1)
// m1, err := transport.NewManager(c1, nil, f1)
// require.NoError(t, err)
// go func() { _ = m1.Serve(context.TODO()) }()
// go func() { _ = m1.Serve(context.TODO()) }() //nolint:errcheck
//
// m2, err := transport.NewManager(c2, f2, f3)
// m2, err := transport.NewManager(c2, nil, f2, f3)
// require.NoError(t, err)
// go func() { _ = m2.Serve(context.TODO()) }()
// go func() { _ = m2.Serve(context.TODO()) }() //nolint:errcheck
//
// m3, err := transport.NewManager(c3, f4)
// m3, err := transport.NewManager(c3, nil, f4)
// require.NoError(t, err)
// go func() { _ = m3.Serve(context.TODO()) }()
// go func() { _ = m3.Serve(context.TODO()) }() //nolint:errcheck
//
// rt := routing.InMemoryRoutingTable()
// conf := &Config{
Expand Down Expand Up @@ -118,8 +118,9 @@ func TestRouterAppInit(t *testing.T) {
pk1, sk1 := cipher.GenerateKeyPair()
c1 := &transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}

m1, err := transport.NewManager(c1)
m1, err := transport.NewManager(c1, nil)
require.NoError(t, err)
go func() { _ = m1.Serve(context.TODO()) }() //nolint:errcheck

conf := &Config{
Logger: logging.MustGetLogger("routesetup"),
Expand All @@ -129,9 +130,10 @@ func TestRouterAppInit(t *testing.T) {
}
r := New(conf)
rw, rwIn := net.Pipe()
errCh := make(chan error)
errCh := make(chan error, 1)
go func() {
errCh <- r.ServeApp(rwIn, 10, &app.Config{AppName: "foo", AppVersion: "0.0.1"})
close(errCh)
}()

proto := app.NewProtocol(rw)
Expand All @@ -149,7 +151,7 @@ func TestRouterAppInit(t *testing.T) {
require.NoError(t, <-errCh)
}

// TODO(evanlinjin): Fix this test.
// TODO(evanlinjin): Figure out what this is testing and fix it.
//func TestRouterApp(t *testing.T) {
// client := transport.NewDiscoveryMock()
// logStore := transport.InMemoryTransportLogStore()
Expand All @@ -162,11 +164,11 @@ func TestRouterAppInit(t *testing.T) {
//
// f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
//
// m1, err := transport.NewManager(c1, f1)
// m1, err := transport.NewManager(c1, nil, f1)
// require.NoError(t, err)
// go func() {_ = m1.Serve(context.TODO())}()
// //go func() {_ = m1.Serve(context.TODO())}()
//
// m2, err := transport.NewManager(c2, f2)
// m2, err := transport.NewManager(c2, nil, f2)
// require.NoError(t, err)
// go func() {_ = m2.Serve(context.TODO())}()
//
Expand Down Expand Up @@ -253,7 +255,7 @@ func TestRouterLocalApp(t *testing.T) {
logStore := transport.InMemoryTransportLogStore()

pk, sk := cipher.GenerateKeyPair()
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore})
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil)
require.NoError(t, err)

conf := &Config{
Expand Down Expand Up @@ -335,13 +337,12 @@ func TestRouterSetup(t *testing.T) {
c2 := &transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}

f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
m1, err := transport.NewManager(c1, f1)
m1, err := transport.NewManager(c1, []cipher.PubKey{pk2}, f1)
require.NoError(t, err)

m2, err := transport.NewManager(c2, f2)
m2, err := transport.NewManager(c2, nil, f2)
require.NoError(t, err)

m1.SetSetupPKs([]cipher.PubKey{pk2})
go func() { _ = m2.Serve(context.TODO()) }() //nolint:errcheck

rt := routing.InMemoryRoutingTable()
conf := &Config{
Expand Down Expand Up @@ -537,17 +538,22 @@ func TestRouterSetupLoop(t *testing.T) {
f1.SetType(dmsg.Type)
f2.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk2},
f1)
require.NoError(t, err)
m1.SetSetupPKs([]cipher.PubKey{pk2})

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk1},
f2)
require.NoError(t, err)
m2.SetSetupPKs([]cipher.PubKey{pk1})

serveErrCh := make(chan error, 1)
go func() {
serveErrCh <- m2.Serve(context.TODO())
close(serveErrCh)
}()

conf := &Config{
Expand Down Expand Up @@ -666,13 +672,17 @@ func TestRouterCloseLoop(t *testing.T) {
f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
f1.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk2},
f1)
require.NoError(t, err)
m1.SetSetupPKs([]cipher.PubKey{pk2})

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk1},
f2)
require.NoError(t, err)
m2.SetSetupPKs([]cipher.PubKey{pk1})

serveErrCh := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -773,13 +783,17 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) {
f1, f2 := transport.NewMockFactoryPair(pk1, pk2)
f1.SetType(dmsg.Type)

m1, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore}, f1)
m1, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk2},
f1)
require.NoError(t, err)
m1.SetSetupPKs([]cipher.PubKey{pk2})

m2, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore}, f2)
m2, err := transport.NewManager(
&transport.ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client, LogStore: logStore},
[]cipher.PubKey{pk1},
f2)
require.NoError(t, err)
m2.SetSetupPKs([]cipher.PubKey{pk1})

serveErrCh := make(chan error, 1)
go func() {
Expand Down Expand Up @@ -872,8 +886,9 @@ func TestRouterRouteExpiration(t *testing.T) {
logStore := transport.InMemoryTransportLogStore()

pk, sk := cipher.GenerateKeyPair()
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore})
m, err := transport.NewManager(&transport.ManagerConfig{PubKey: pk, SecKey: sk, DiscoveryClient: client, LogStore: logStore}, nil)
require.NoError(t, err)
go func() { _ = m.Serve(context.TODO()) }() //nolint:errcheck

rt := routing.InMemoryRoutingTable()
_, err = rt.AddRule(routing.AppRule(time.Now().Add(-time.Hour), 4, pk, 6, 5))
Expand Down
4 changes: 2 additions & 2 deletions pkg/transport/handshake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func newHsMockEnv() *hsMockEnv {
tr1 := NewMockTransport(in, pk1, pk2)
tr2 := NewMockTransport(out, pk2, pk1)

m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client})
m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client})
m1, err1 := NewManager(&ManagerConfig{PubKey: pk1, SecKey: sk1, DiscoveryClient: client}, nil)
m2, err2 := NewManager(&ManagerConfig{PubKey: pk2, SecKey: sk2, DiscoveryClient: client}, nil)

return &hsMockEnv{
client: client,
Expand Down
17 changes: 13 additions & 4 deletions pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,16 @@ func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn Transport) er
return ErrConnAlreadyExists
}

if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
var err error
for i := 0; i < 3; i++ {
if _, err = mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: true}); err != nil {
mt.log.Warnf("Failed to update transport status: %s, retrying...", err)
continue
}
mt.log.Infoln("Status updated: UP")
break
}
mt.log.Infoln("Status updated: UP")

mt.conn = conn
select {
case mt.connCh <- struct{}{}:
Expand All @@ -260,11 +266,14 @@ func (mt *ManagedTransport) setIfConnNil(ctx context.Context, conn Transport) er
}

func (mt *ManagedTransport) clearConn(ctx context.Context) {
if mt.conn != nil {
_ = mt.conn.Close() //nolint:errcheck
mt.conn = nil
}
if _, err := mt.dc.UpdateStatuses(ctx, &Status{ID: mt.Entry.ID, IsUp: false}); err != nil {
mt.log.Warnf("Failed to update transport status: %s", err)
}
mt.log.Infoln("Status updated: DOWN")
mt.conn = nil
}

// WritePacket writes a packet to the remote.
Expand Down
110 changes: 40 additions & 70 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,50 +39,28 @@ type Manager struct {

// NewManager creates a Manager with the provided configuration and transport factories.
// 'factories' should be ordered by preference.
func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
log := logging.MustGetLogger("tp_manager")
ctx := context.Background()

done := make(chan struct{})

fMap := make(map[string]Factory)
for _, factory := range factories {
fMap[factory.Type()] = factory
}

entries, err := config.DiscoveryClient.GetTransportsByEdge(ctx, config.PubKey)
if err != nil {
log.Warnf("No transports found for local node: %v", err)
func NewManager(config *ManagerConfig, setupPKs []cipher.PubKey, factories ...Factory) (*Manager, error) {
tm := &Manager{
Logger: logging.MustGetLogger("tp_manager"),
conf: config,
setupPKS: setupPKs,
facs: make(map[string]Factory),
tps: make(map[uuid.UUID]*ManagedTransport),
setupCh: make(chan Transport, 9), // TODO: eliminate or justify buffering here
readCh: make(chan routing.Packet, 20),
done: make(chan struct{}),
}

rCh := make(chan routing.Packet, 20)
tpMap := make(map[uuid.UUID]*ManagedTransport)
for _, entry := range entries {
fac, ok := fMap[entry.Entry.Type]
if !ok {
log.Warnf("cannot revive transport entry: factory of type '%s' not supported", entry.Entry.Type)
continue
}
mTp := NewManagedTransport(fac, config.DiscoveryClient, config.LogStore, entry.Entry.RemoteEdge(config.PubKey), config.SecKey)
go mTp.Serve(rCh, done)
tpMap[entry.Entry.ID] = mTp
for _, factory := range factories {
tm.facs[factory.Type()] = factory
}

return &Manager{
Logger: log,
conf: config,
facs: fMap,
tps: tpMap,
setupCh: make(chan Transport, 9), // TODO: eliminate or justify buffering here
readCh: rCh,
done: done,
}, nil
return tm, nil
}

// Serve runs listening loop across all registered factories.
func (tm *Manager) Serve(ctx context.Context) error {
tm.initDefaultTransports(ctx)
tm.Logger.Infof("Default transports created.")
tm.mx.Lock()
tm.initTransports(ctx)
tm.mx.Unlock()

var wg sync.WaitGroup
for _, factory := range tm.facs {
Expand Down Expand Up @@ -112,24 +90,19 @@ func (tm *Manager) Serve(ctx context.Context) error {
return nil
}

// initDefaultTransports created transports to DefaultNodes if they don't exist.
func (tm *Manager) initDefaultTransports(ctx context.Context) {
for _, pk := range tm.conf.DefaultNodes {
pk := pk
exist := false
tm.WalkTransports(func(tr *ManagedTransport) bool {
if tr.Remote() == pk {
exist = true
return false
}
return true
})
if exist {
continue
}
_, err := tm.SaveTransport(ctx, pk, "dmsg")
if err != nil {
tm.Logger.Warnf("Failed to establish transport to a node %s: %s", pk, err)
func (tm *Manager) initTransports(ctx context.Context) {
entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey)
if err != nil {
log.Warnf("No transports found for local node: %v", err)
}
for _, entry := range entries {
var (
tpType = entry.Entry.Type
remote = entry.Entry.RemoteEdge(tm.conf.PubKey)
tpID = entry.Entry.ID
)
if _, err := tm.saveTransport(remote, tpType); err != nil {
tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID)
}
}
}
Expand Down Expand Up @@ -181,7 +154,17 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy
if tm.isClosing() {
return nil, io.ErrClosedPipe
}
mTp, err := tm.saveTransport(remote, tpType)
if err != nil {
return nil, err
}
if err := mTp.Dial(ctx); err != nil {
tm.Logger.Warnf("underlying 'write' tp failed, will retry: %v", err)
}
return mTp, nil
}

func (tm *Manager) saveTransport(remote cipher.PubKey, tpType string) (*ManagedTransport, error) {
factory, ok := tm.facs[tpType]
if !ok {
return nil, errors.New("unknown transport type")
Expand All @@ -195,9 +178,6 @@ func (tm *Manager) SaveTransport(ctx context.Context, remote cipher.PubKey, tpTy
}

mTp := NewManagedTransport(factory, tm.conf.DiscoveryClient, tm.conf.LogStore, remote, tm.conf.SecKey)
if err := mTp.Dial(ctx); err != nil {
tm.Logger.Warnf("underlying 'write' tp failed, will retry: %v", err)
}
go mTp.Serve(tm.readCh, tm.done)
tm.tps[tpID] = mTp

Expand Down Expand Up @@ -235,10 +215,7 @@ func (tm *Manager) ReadPacket() (routing.Packet, error) {

// SetupPKs returns setup node list contained within the TransportManager.
func (tm *Manager) SetupPKs() []cipher.PubKey {
tm.mx.RLock()
pks := tm.setupPKS
tm.mx.RUnlock()
return pks
return tm.setupPKS
}

// IsSetupPK checks whether provided `pk` is of `setup` purpose.
Expand All @@ -251,13 +228,6 @@ func (tm *Manager) IsSetupPK(pk cipher.PubKey) bool {
return false
}

// SetSetupPKs sets setup node list contained within the TransportManager.
func (tm *Manager) SetSetupPKs(nodes []cipher.PubKey) {
tm.mx.Lock()
tm.setupPKS = nodes
tm.mx.Unlock()
}

// DialSetupConn dials to a remote setup node.
func (tm *Manager) DialSetupConn(ctx context.Context, remote cipher.PubKey, tpType string) (Transport, error) {
tm.mx.Lock()
Expand Down
Loading

0 comments on commit 8b70a95

Please sign in to comment.