From 6467305d3127f4ab6ae78c79647dd2d0b8f664ff Mon Sep 17 00:00:00 2001 From: Alex Yu Date: Tue, 28 May 2019 09:49:22 +0300 Subject: [PATCH] 1. ManagedTransport: new field bool Accepted - to distinguish dialled and accepted transports 2. Manager: it's only one channel now for both accepted and dialled transports 3. Router.Serve: it's only one loop for both accepted and dialled transports 4. setup.Node changes in the same way Tested by `make test` Integration messaging tests shows changed behaviour: - the first message is **always** fail - then communication works normal Need more thorough tests --- pkg/router/router.go | 59 ++++++++++++++---------------- pkg/router/router_test.go | 28 +++++++++++--- pkg/setup/node.go | 40 ++++++++++++++------ pkg/setup/node_test.go | 8 +--- pkg/transport/managed_transport.go | 4 +- pkg/transport/manager.go | 44 +++++++++++----------- pkg/transport/manager_test.go | 17 ++++++--- 7 files changed, 115 insertions(+), 85 deletions(-) diff --git a/pkg/router/router.go b/pkg/router/router.go index 0c2b2d9877..7a4c78ed28 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -78,44 +78,39 @@ func New(config *Config) *Router { // Serve starts transport listening loop. func (r *Router) Serve(ctx context.Context) error { + go func() { - for tr := range r.tm.AcceptedTrChan { - go func(t transport.Transport) { - for { - var err error - if r.IsSetupTransport(t) { - err = r.rm.Serve(t) - } else { - err = r.serveTransport(t) - } + for tr := range r.tm.TrChan { + if tr.Accepted { + go func(t transport.Transport) { + for { + var err error + if r.IsSetupTransport(t) { + err = r.rm.Serve(t) + } else { + err = r.serveTransport(t) + } - if err != nil { - if err != io.EOF { - r.Logger.Warnf("Stopped serving Transport: %s", err) + if err != nil { + if err != io.EOF { + r.Logger.Warnf("Stopped serving Transport: %s", err) + } + return } - return } - } - }(tr) - } - }() - - go func() { - for tr := range r.tm.DialedTrChan { - if r.IsSetupTransport(tr) { - continue - } - - go func(t transport.Transport) { - for { - if err := r.serveTransport(t); err != nil { - if err != io.EOF { - r.Logger.Warnf("Stopped serving Transport: %s", err) + }(tr) + } else { + go func(t transport.Transport) { + for { + if err := r.serveTransport(t); err != nil { + if err != io.EOF { + r.Logger.Warnf("Stopped serving Transport: %s", err) + } + return } - return } - } - }(tr) + }(tr) + } } }() diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 1470a8009d..d07d6d0f96 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -487,8 +487,14 @@ func TestRouterSetupLoop(t *testing.T) { r := New(conf) errCh := make(chan error) go func() { - acceptCh, _ := m2.Observe() - tr := <-acceptCh + // acceptCh, _ := m2.Observe() + // tr := <-acceptCh + var tr *transport.ManagedTransport + for tr = range m2.TrChan { + if tr.Accepted { + break + } + } proto := setup.NewSetupProtocol(tr) p, data, err := proto.ReadPacket() @@ -593,8 +599,14 @@ func TestRouterCloseLoop(t *testing.T) { r := New(conf) errCh := make(chan error) go func() { - acceptCh, _ := m2.Observe() - tr := <-acceptCh + // acceptCh, _ := m2.Observe() + // tr := <-acceptCh + var tr *transport.ManagedTransport + for tr = range m2.TrChan { + if tr.Accepted { + break + } + } proto := setup.NewSetupProtocol(tr) p, data, err := proto.ReadPacket() @@ -681,8 +693,12 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) { r := New(conf) errCh := make(chan error) go func() { - acceptCh, _ := m2.Observe() - tr := <-acceptCh + var tr *transport.ManagedTransport + for tr = range m2.TrChan { + if tr.Accepted { + break + } + } proto := setup.NewSetupProtocol(tr) p, data, err := proto.ReadPacket() diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 65bac0926a..5f1a56ac57 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -91,22 +91,38 @@ func (sn *Node) Serve(ctx context.Context) error { } go func() { - for tr := range sn.tm.AcceptedTrChan { - go func(t transport.Transport) { - for { - if err := sn.serveTransport(t); err != nil { - sn.Logger.Warnf("Failed to serve Transport: %s", err) - return + for tr := range sn.tm.TrChan { + + if tr.Accepted { + go func(t transport.Transport) { + for { + if err := sn.serveTransport(t); err != nil { + sn.Logger.Warnf("Failed to serve Transport: %s", err) + return + } } - } - }(tr) + }(tr) + } } }() - go func() { - for range sn.tm.DialedTrChan { - } - }() + // go func() { + // for tr := range sn.tm.AcceptedTrChan { + // go func(t transport.Transport) { + // for { + // if err := sn.serveTransport(t); err != nil { + // sn.Logger.Warnf("Failed to serve Transport: %s", err) + // return + // } + // } + // }(tr) + // } + // }() + + // go func() { + // for range sn.tm.DialedTrChan { + // } + // }() sn.Logger.Info("Starting Setup Node") return sn.tm.Serve(ctx) diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index cf0062082c..c5bd4878a6 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -274,13 +274,7 @@ func newMockNode(tm *transport.Manager) *mockNode { func (n *mockNode) serve() error { go func() { - for tr := range n.tm.DialedTrChan { - go func(t transport.Transport) { n.serveTransport(t) }(tr) // nolint: errcheck - } - }() - - go func() { - for tr := range n.tm.AcceptedTrChan { + for tr := range n.tm.TrChan { go func(t transport.Transport) { n.serveTransport(t) }(tr) // nolint: errcheck } }() diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index 29ac43e567..c801b29934 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -13,6 +13,7 @@ type ManagedTransport struct { Transport ID uuid.UUID Public bool + Accepted bool LogEntry *LogEntry doneChan chan struct{} @@ -23,11 +24,12 @@ type ManagedTransport struct { writeLogChan chan int } -func newManagedTransport(id uuid.UUID, tr Transport, public bool) *ManagedTransport { +func newManagedTransport(id uuid.UUID, tr Transport, public bool, accepted bool) *ManagedTransport { return &ManagedTransport{ ID: id, Transport: tr, Public: public, + Accepted: accepted, doneChan: make(chan struct{}), errChan: make(chan error), readLogChan: make(chan int), diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index e1a7b9b1f4..b5b3e67f21 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -32,10 +32,11 @@ type Manager struct { transports map[uuid.UUID]*ManagedTransport entries map[Entry]struct{} - doneChan chan struct{} - AcceptedTrChan chan *ManagedTransport - DialedTrChan chan *ManagedTransport - mu sync.RWMutex + doneChan chan struct{} + // AcceptedTrChan chan *ManagedTransport + // DialedTrChan chan *ManagedTransport + TrChan chan *ManagedTransport + mu sync.RWMutex } // NewManager creates a Manager with the provided configuration and transport factories. @@ -54,22 +55,23 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) { } return &Manager{ - Logger: logging.MustGetLogger("trmanager"), - config: config, - factories: fMap, - transports: make(map[uuid.UUID]*ManagedTransport), - entries: mEntries, - AcceptedTrChan: make(chan *ManagedTransport, 10), - DialedTrChan: make(chan *ManagedTransport, 10), - doneChan: make(chan struct{}), + Logger: logging.MustGetLogger("trmanager"), + config: config, + factories: fMap, + transports: make(map[uuid.UUID]*ManagedTransport), + entries: mEntries, + // AcceptedTrChan: make(chan *ManagedTransport, 10), + // DialedTrChan: make(chan *ManagedTransport, 10), + TrChan: make(chan *ManagedTransport, 9), //IDK why it was 10 before + doneChan: make(chan struct{}), }, nil } -// Observe returns channel for notifications about new Transport -// registration. Only single observer is supported. -func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) { - return tm.AcceptedTrChan, tm.DialedTrChan -} +// // Observe returns channel for notifications about new Transport +// // registration. Only single observer is supported. +// func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) { +// return tm.AcceptedTrChan, tm.DialedTrChan +// } // Factories returns all the factory types contained within the TransportManager. func (tm *Manager) Factories() []string { @@ -298,12 +300,12 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp } tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID) - managedTr := newManagedTransport(entry.ID, tr, entry.Public) + managedTr := newManagedTransport(entry.ID, tr, entry.Public, false) tm.mu.Lock() tm.transports[entry.ID] = managedTr select { case <-tm.doneChan: - case tm.DialedTrChan <- managedTr: + case tm.TrChan <- managedTr: default: } tm.mu.Unlock() @@ -370,13 +372,13 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag } tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID) - managedTr := newManagedTransport(entry.ID, tr, entry.Public) + managedTr := newManagedTransport(entry.ID, tr, entry.Public, true) tm.mu.Lock() tm.transports[entry.ID] = managedTr select { case <-tm.doneChan: - case tm.AcceptedTrChan <- managedTr: + case tm.TrChan <- managedTr: default: } tm.mu.Unlock() diff --git a/pkg/transport/manager_test.go b/pkg/transport/manager_test.go index befba6e42b..c0443c6ad2 100644 --- a/pkg/transport/manager_test.go +++ b/pkg/transport/manager_test.go @@ -41,21 +41,26 @@ func TestTransportManager(t *testing.T) { var mu sync.Mutex m1Observed := uint32(0) - acceptCh, _ := m1.Observe() + + acceptCh := m1.TrChan go func() { - for range acceptCh { + for tr := range acceptCh { mu.Lock() - m1Observed++ + if tr.Accepted { + m1Observed++ + } mu.Unlock() } }() m2Observed := uint32(0) - _, dialCh := m2.Observe() + dialCh := m2.TrChan go func() { - for range dialCh { + for tr := range dialCh { mu.Lock() - m2Observed++ + if !tr.Accepted { + m2Observed++ + } mu.Unlock() } }()