Skip to content

Commit

Permalink
1. ManagedTransport: new field bool Accepted - to distinguish dialled
Browse files Browse the repository at this point in the history
and accepted transports
2. Manager: it's only one channel now for both accepted and dialled
transports
3. Router.Serve: it's only one loop for both accepted and dialled
transports
4. setup.Node changes in the same way

Tested by `make test`

Integration messaging tests shows changed behaviour:

- the first message is **always** fail
- then communication works normal

Need more thorough tests
  • Loading branch information
ayuryshev committed May 28, 2019
1 parent 5716e7b commit 6467305
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 85 deletions.
59 changes: 27 additions & 32 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,44 +78,39 @@ func New(config *Config) *Router {

// Serve starts transport listening loop.
func (r *Router) Serve(ctx context.Context) error {

go func() {
for tr := range r.tm.AcceptedTrChan {
go func(t transport.Transport) {
for {
var err error
if r.IsSetupTransport(t) {
err = r.rm.Serve(t)
} else {
err = r.serveTransport(t)
}
for tr := range r.tm.TrChan {
if tr.Accepted {
go func(t transport.Transport) {
for {
var err error
if r.IsSetupTransport(t) {
err = r.rm.Serve(t)
} else {
err = r.serveTransport(t)
}

if err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
if err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
return
}
}
}(tr)
}
}()

go func() {
for tr := range r.tm.DialedTrChan {
if r.IsSetupTransport(tr) {
continue
}

go func(t transport.Transport) {
for {
if err := r.serveTransport(t); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}(tr)
} else {
go func(t transport.Transport) {
for {
if err := r.serveTransport(t); err != nil {
if err != io.EOF {
r.Logger.Warnf("Stopped serving Transport: %s", err)
}
return
}
return
}
}
}(tr)
}(tr)
}
}
}()

Expand Down
28 changes: 22 additions & 6 deletions pkg/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,14 @@ func TestRouterSetupLoop(t *testing.T) {
r := New(conf)
errCh := make(chan error)
go func() {
acceptCh, _ := m2.Observe()
tr := <-acceptCh
// acceptCh, _ := m2.Observe()
// tr := <-acceptCh
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
if tr.Accepted {
break
}
}

proto := setup.NewSetupProtocol(tr)
p, data, err := proto.ReadPacket()
Expand Down Expand Up @@ -593,8 +599,14 @@ func TestRouterCloseLoop(t *testing.T) {
r := New(conf)
errCh := make(chan error)
go func() {
acceptCh, _ := m2.Observe()
tr := <-acceptCh
// acceptCh, _ := m2.Observe()
// tr := <-acceptCh
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
if tr.Accepted {
break
}
}

proto := setup.NewSetupProtocol(tr)
p, data, err := proto.ReadPacket()
Expand Down Expand Up @@ -681,8 +693,12 @@ func TestRouterCloseLoopOnAppClose(t *testing.T) {
r := New(conf)
errCh := make(chan error)
go func() {
acceptCh, _ := m2.Observe()
tr := <-acceptCh
var tr *transport.ManagedTransport
for tr = range m2.TrChan {
if tr.Accepted {
break
}
}

proto := setup.NewSetupProtocol(tr)
p, data, err := proto.ReadPacket()
Expand Down
40 changes: 28 additions & 12 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,38 @@ func (sn *Node) Serve(ctx context.Context) error {
}

go func() {
for tr := range sn.tm.AcceptedTrChan {
go func(t transport.Transport) {
for {
if err := sn.serveTransport(t); err != nil {
sn.Logger.Warnf("Failed to serve Transport: %s", err)
return
for tr := range sn.tm.TrChan {

if tr.Accepted {
go func(t transport.Transport) {
for {
if err := sn.serveTransport(t); err != nil {
sn.Logger.Warnf("Failed to serve Transport: %s", err)
return
}
}
}
}(tr)
}(tr)
}
}
}()

go func() {
for range sn.tm.DialedTrChan {
}
}()
// go func() {
// for tr := range sn.tm.AcceptedTrChan {
// go func(t transport.Transport) {
// for {
// if err := sn.serveTransport(t); err != nil {
// sn.Logger.Warnf("Failed to serve Transport: %s", err)
// return
// }
// }
// }(tr)
// }
// }()

// go func() {
// for range sn.tm.DialedTrChan {
// }
// }()

sn.Logger.Info("Starting Setup Node")
return sn.tm.Serve(ctx)
Expand Down
8 changes: 1 addition & 7 deletions pkg/setup/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,7 @@ func newMockNode(tm *transport.Manager) *mockNode {

func (n *mockNode) serve() error {
go func() {
for tr := range n.tm.DialedTrChan {
go func(t transport.Transport) { n.serveTransport(t) }(tr) // nolint: errcheck
}
}()

go func() {
for tr := range n.tm.AcceptedTrChan {
for tr := range n.tm.TrChan {
go func(t transport.Transport) { n.serveTransport(t) }(tr) // nolint: errcheck
}
}()
Expand Down
4 changes: 3 additions & 1 deletion pkg/transport/managed_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type ManagedTransport struct {
Transport
ID uuid.UUID
Public bool
Accepted bool
LogEntry *LogEntry

doneChan chan struct{}
Expand All @@ -23,11 +24,12 @@ type ManagedTransport struct {
writeLogChan chan int
}

func newManagedTransport(id uuid.UUID, tr Transport, public bool) *ManagedTransport {
func newManagedTransport(id uuid.UUID, tr Transport, public bool, accepted bool) *ManagedTransport {
return &ManagedTransport{
ID: id,
Transport: tr,
Public: public,
Accepted: accepted,
doneChan: make(chan struct{}),
errChan: make(chan error),
readLogChan: make(chan int),
Expand Down
44 changes: 23 additions & 21 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type Manager struct {
transports map[uuid.UUID]*ManagedTransport
entries map[Entry]struct{}

doneChan chan struct{}
AcceptedTrChan chan *ManagedTransport
DialedTrChan chan *ManagedTransport
mu sync.RWMutex
doneChan chan struct{}
// AcceptedTrChan chan *ManagedTransport
// DialedTrChan chan *ManagedTransport
TrChan chan *ManagedTransport
mu sync.RWMutex
}

// NewManager creates a Manager with the provided configuration and transport factories.
Expand All @@ -54,22 +55,23 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
}

return &Manager{
Logger: logging.MustGetLogger("trmanager"),
config: config,
factories: fMap,
transports: make(map[uuid.UUID]*ManagedTransport),
entries: mEntries,
AcceptedTrChan: make(chan *ManagedTransport, 10),
DialedTrChan: make(chan *ManagedTransport, 10),
doneChan: make(chan struct{}),
Logger: logging.MustGetLogger("trmanager"),
config: config,
factories: fMap,
transports: make(map[uuid.UUID]*ManagedTransport),
entries: mEntries,
// AcceptedTrChan: make(chan *ManagedTransport, 10),
// DialedTrChan: make(chan *ManagedTransport, 10),
TrChan: make(chan *ManagedTransport, 9), //IDK why it was 10 before
doneChan: make(chan struct{}),
}, nil
}

// Observe returns channel for notifications about new Transport
// registration. Only single observer is supported.
func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) {
return tm.AcceptedTrChan, tm.DialedTrChan
}
// // Observe returns channel for notifications about new Transport
// // registration. Only single observer is supported.
// func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) {
// return tm.AcceptedTrChan, tm.DialedTrChan
// }

// Factories returns all the factory types contained within the TransportManager.
func (tm *Manager) Factories() []string {
Expand Down Expand Up @@ -298,12 +300,12 @@ func (tm *Manager) createTransport(ctx context.Context, remote cipher.PubKey, tp
}

tm.Logger.Infof("Dialed to %s using %s factory. Transport ID: %s", remote, tpType, entry.ID)
managedTr := newManagedTransport(entry.ID, tr, entry.Public)
managedTr := newManagedTransport(entry.ID, tr, entry.Public, false)
tm.mu.Lock()
tm.transports[entry.ID] = managedTr
select {
case <-tm.doneChan:
case tm.DialedTrChan <- managedTr:
case tm.TrChan <- managedTr:
default:
}
tm.mu.Unlock()
Expand Down Expand Up @@ -370,13 +372,13 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
}

tm.Logger.Infof("Accepted new transport with type %s from %s. ID: %s", factory.Type(), remote, entry.ID)
managedTr := newManagedTransport(entry.ID, tr, entry.Public)
managedTr := newManagedTransport(entry.ID, tr, entry.Public, true)
tm.mu.Lock()

tm.transports[entry.ID] = managedTr
select {
case <-tm.doneChan:
case tm.AcceptedTrChan <- managedTr:
case tm.TrChan <- managedTr:
default:
}
tm.mu.Unlock()
Expand Down
17 changes: 11 additions & 6 deletions pkg/transport/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,26 @@ func TestTransportManager(t *testing.T) {

var mu sync.Mutex
m1Observed := uint32(0)
acceptCh, _ := m1.Observe()

acceptCh := m1.TrChan
go func() {
for range acceptCh {
for tr := range acceptCh {
mu.Lock()
m1Observed++
if tr.Accepted {
m1Observed++
}
mu.Unlock()
}
}()

m2Observed := uint32(0)
_, dialCh := m2.Observe()
dialCh := m2.TrChan
go func() {
for range dialCh {
for tr := range dialCh {
mu.Lock()
m2Observed++
if !tr.Accepted {
m2Observed++
}
mu.Unlock()
}
}()
Expand Down

0 comments on commit 6467305

Please sign in to comment.