Skip to content

Commit

Permalink
Remove transport.Manager dependency from setup node
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jul 3, 2019
1 parent 6769fa6 commit 8985275
Showing 1 changed file with 38 additions and 24 deletions.
62 changes: 38 additions & 24 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ type Hop struct {

// Node performs routes setup operations over messaging channel.
type Node struct {
Logger *logging.Logger

tm *transport.Manager
Logger *logging.Logger
messenger *dmsg.Client

srvCount int
metrics metrics.Recorder
srvCount int
metrics metrics.Recorder
}

// NewNode constructs a new SetupNode.
Expand Down Expand Up @@ -68,7 +65,6 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) {
return &Node{
Logger: logger.PackageLogger("routesetup"),
metrics: metrics,
tm: tm,
messenger: messenger,
srvCount: conf.Messaging.ServerCount,
}, nil
Expand All @@ -84,22 +80,23 @@ func (sn *Node) Serve(ctx context.Context) error {
}

go func() {
for tr := range sn.tm.TrChan {
if tr.Accepted {
go func(t transport.Transport) {
for {
if err := sn.serveTransport(t); err != nil {
sn.Logger.Warnf("Failed to serve Transport: %s", err)
return
}
}
}(tr)
for {
tp, err := sn.messenger.Accept(ctx)
if err != nil {
sn.Logger.Warnf("Failed to accept Transport: %s", err)
}
go func(tp transport.Transport) {
for {
if err := sn.serveTransport(tp); err != nil {
sn.Logger.Warnf("Failed to serve Transport: %s", err)
}
}
}(tp)
}
}()

sn.Logger.Info("Starting Setup Node")
return sn.tm.Serve(ctx)
return nil
}

func (sn *Node) createLoop(l *routing.Loop) error {
Expand Down Expand Up @@ -179,9 +176,9 @@ func (sn *Node) createRoute(expireAt time.Time, route routing.Route, rport, lpor
return routeID, nil
}

// Close closes underlying transport manager.
// Close closes underlying dmsg client.
func (sn *Node) Close() error {
return sn.tm.Close()
return sn.messenger.Close()
}

func (sn *Node) serveTransport(tr transport.Transport) error {
Expand All @@ -203,7 +200,7 @@ func (sn *Node) serveTransport(tr transport.Transport) error {
case PacketCloseLoop:
ld := &LoopData{}
if err = json.Unmarshal(data, ld); err == nil {
remote, ok := sn.tm.Remote(tr.Edges())
remote, ok := sn.remote(tr.Edges())
if !ok {
return errors.New("configured PubKey not found in edges")
}
Expand All @@ -222,8 +219,21 @@ func (sn *Node) serveTransport(tr transport.Transport) error {
return proto.WritePacket(RespSuccess, nil)
}

func (sn *Node) remote(edges [2]cipher.PubKey) (cipher.PubKey, bool) {
pubKey := sn.messenger.Local()
if pubKey == edges[0] {
return edges[1], true
}
if pubKey == edges[1] {
return edges[0], true
}
return cipher.PubKey{}, false
}

func (sn *Node) connectLoop(on cipher.PubKey, ld *LoopData) (noiseRes []byte, err error) {
tr, err := sn.tm.CreateTransport(context.Background(), on, dmsg.Type, false)
ctx := context.Background()

tr, err := sn.messenger.Dial(ctx, on)
if err != nil {
err = fmt.Errorf("transport: %s", err)
return
Expand All @@ -241,7 +251,9 @@ func (sn *Node) connectLoop(on cipher.PubKey, ld *LoopData) (noiseRes []byte, er
}

func (sn *Node) closeLoop(on cipher.PubKey, ld *LoopData) error {
tr, err := sn.tm.CreateTransport(context.Background(), on, dmsg.Type, false)
ctx := context.Background()

tr, err := sn.messenger.Dial(ctx, on)
if err != nil {
return fmt.Errorf("transport: %s", err)
}
Expand All @@ -257,7 +269,9 @@ func (sn *Node) closeLoop(on cipher.PubKey, ld *LoopData) error {
}

func (sn *Node) setupRule(pubKey cipher.PubKey, rule routing.Rule) (routeID routing.RouteID, err error) {
tr, err := sn.tm.CreateTransport(context.Background(), pubKey, dmsg.Type, false)
ctx := context.Background()

tr, err := sn.messenger.Dial(ctx, pubKey)
if err != nil {
err = fmt.Errorf("transport: %s", err)
return
Expand Down

0 comments on commit 8985275

Please sign in to comment.