Skip to content

Commit

Permalink
More strict typing in PacketRouter, Router, mockRouter
Browse files Browse the repository at this point in the history
Removed overgeneralization of ManagedTransport as Transport where it
makes no sense.

Tests passed.
  • Loading branch information
ayuryshev committed May 29, 2019
1 parent 6467305 commit 0fe0f59
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type PacketRouter interface {
io.Closer
Serve(ctx context.Context) error
ServeApp(conn net.Conn, port uint16, appConf *app.Config) error
IsSetupTransport(tr transport.Transport) bool
IsSetupTransport(tr *transport.ManagedTransport) bool
}

// Node provides messaging runtime for Apps by setting up all
Expand Down
2 changes: 1 addition & 1 deletion pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,6 @@ func (r *mockRouter) Close() error {
return nil
}

func (r *mockRouter) IsSetupTransport(tr transport.Transport) bool {
func (r *mockRouter) IsSetupTransport(tr *transport.ManagedTransport) bool {
return false
}
8 changes: 4 additions & 4 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *Router) Serve(ctx context.Context) error {
go func() {
for tr := range r.tm.TrChan {
if tr.Accepted {
go func(t transport.Transport) {
go func(t *transport.ManagedTransport) {
for {
var err error
if r.IsSetupTransport(t) {
Expand All @@ -100,7 +100,7 @@ func (r *Router) Serve(ctx context.Context) error {
}
}(tr)
} else {
go func(t transport.Transport) {
go func(t *transport.ManagedTransport) {
for {
if err := r.serveTransport(t); err != nil {
if err != io.EOF {
Expand Down Expand Up @@ -178,7 +178,7 @@ func (r *Router) Close() error {
return r.tm.Close()
}

func (r *Router) serveTransport(tr transport.Transport) error {
func (r *Router) serveTransport(tr *transport.ManagedTransport) error {
packet := make(routing.Packet, 6)
if _, err := io.ReadFull(tr, packet); err != nil {
return err
Expand Down Expand Up @@ -476,7 +476,7 @@ func (r *Router) advanceNoiseHandshake(addr *app.LoopAddr, noiseMsg []byte) (ni
}

// IsSetupTransport checks whether `tr` is running in the `setup` mode.
func (r *Router) IsSetupTransport(tr transport.Transport) bool {
func (r *Router) IsSetupTransport(tr *transport.ManagedTransport) bool {
for _, pk := range r.config.SetupNodes {
remote, ok := r.tm.Remote(tr.Edges())
if ok && (remote == pk) {
Expand Down
18 changes: 0 additions & 18 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,6 @@ func (sn *Node) Serve(ctx context.Context) error {
}
}()

// go func() {
// for tr := range sn.tm.AcceptedTrChan {
// go func(t transport.Transport) {
// for {
// if err := sn.serveTransport(t); err != nil {
// sn.Logger.Warnf("Failed to serve Transport: %s", err)
// return
// }
// }
// }(tr)
// }
// }()

// go func() {
// for range sn.tm.DialedTrChan {
// }
// }()

sn.Logger.Info("Starting Setup Node")
return sn.tm.Serve(ctx)
}
Expand Down
13 changes: 2 additions & 11 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ type Manager struct {
entries map[Entry]struct{}

doneChan chan struct{}
// AcceptedTrChan chan *ManagedTransport
// DialedTrChan chan *ManagedTransport
TrChan chan *ManagedTransport
mu sync.RWMutex
TrChan chan *ManagedTransport
mu sync.RWMutex
}

// NewManager creates a Manager with the provided configuration and transport factories.
Expand Down Expand Up @@ -67,12 +65,6 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
}, nil
}

// // Observe returns channel for notifications about new Transport
// // registration. Only single observer is supported.
// func (tm *Manager) Observe() (accept <-chan *ManagedTransport, dial <-chan *ManagedTransport) {
// return tm.AcceptedTrChan, tm.DialedTrChan
// }

// Factories returns all the factory types contained within the TransportManager.
func (tm *Manager) Factories() []string {
fTypes, i := make([]string, len(tm.factories)), 0
Expand Down Expand Up @@ -383,7 +375,6 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (*Manag
}
tm.mu.Unlock()

// go func(managedTr *ManagedTransport, tm *Manager) {
go func() {
select {
case <-managedTr.doneChan:
Expand Down

0 comments on commit 0fe0f59

Please sign in to comment.