Skip to content

Commit

Permalink
WIP: fix final bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Lin committed Jul 31, 2019
1 parent 6acae11 commit 7b2f582
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error {
}

func (r *Router) forwardAppPacket(appConn *app.Protocol, packet *app.Packet) error {
fmt.Println(">>> PREPARING TO FORWARD APP PACKET!")
if packet.Loop.Remote.PubKey == r.config.PubKey {
return r.forwardLocalAppPacket(packet)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/visor.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (am *appManager) initApp(payload []byte) error {
return errors.New("unexpected app version")
}

am.Logger.Infof("Handshaked new connection with the app %s.v%s", config.AppName, config.AppVersion)
am.Logger.Infof("Finished initiating app: %s.v%s", config.AppName, config.AppVersion)
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/setup/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (sn *Node) serveTransport(tr transport.Transport) error {
}

sn.Logger.Infof("Got new Setup request with type %s: %s", sp, string(data))
defer sn.Logger.Infof("Completed Setup request with type %s: %s", sp, string(data))

startTime := time.Now()
switch sp {
Expand Down Expand Up @@ -256,9 +257,12 @@ func (sn *Node) remote(edges [2]cipher.PubKey) (cipher.PubKey, bool) {
}

func (sn *Node) closeLoop(on cipher.PubKey, ld routing.LoopData) error {
fmt.Printf(">>> BEGIN: closeLoop(%s, ld)\n", on)
defer fmt.Printf(">>> END: closeLoop(%s, ld)\n", on)
ctx := context.Background()

tr, err := sn.messenger.Dial(ctx, on)
fmt.Println(">>> *****: closeLoop() dialed:", err)
if err != nil {
return fmt.Errorf("transport: %s", err)
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/transport/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transport
import (
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -61,7 +62,7 @@ func NewManager(config *ManagerConfig, factories ...Factory) (*Manager, error) {
}

return &Manager{
Logger: logging.MustGetLogger("trmanager"),
Logger: logging.MustGetLogger("tp_manager"),
config: config,
factories: fMap,
transports: make(map[uuid.UUID]*ManagedTransport),
Expand Down Expand Up @@ -118,6 +119,8 @@ func (tm *Manager) WalkTransports(walk func(tp *ManagedTransport) bool) {

// reconnectTransports tries to reconnect previously established transports.
func (tm *Manager) reconnectTransports(ctx context.Context) {
defer tm.Logger.Println("Finished reconnecting transports.")

tm.mu.RLock()
entries := make(map[Entry]struct{})
for tmEntry := range tm.entries {
Expand All @@ -135,12 +138,15 @@ func (tm *Manager) reconnectTransports(ctx context.Context) {
continue
}

if !tm.IsSetupPK(remote) {
continue
}

_, err := tm.CreateDataTransport(ctx, remote, entry.Type, entry.Public)
if err != nil {
tm.Logger.Warnf("Failed to re-establish transport: %s", err)
continue
}

if _, err := tm.config.DiscoveryClient.UpdateStatuses(ctx, &Status{ID: entry.ID, IsUp: true}); err != nil {
tm.Logger.Warnf("Failed to change transport status: %s", err)
}
Expand Down Expand Up @@ -200,26 +206,22 @@ func (tm *Manager) Serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
tm.Logger.Info("Received ctx.Done()")
return
case <-tm.doneChan:
tm.Logger.Info("Received tm.doneCh")
return
default:
if _, err := tm.acceptTransport(ctx, f); err != nil {
if strings.Contains(err.Error(), "closed") {
return
}

tm.Logger.Warnf("Failed to accept connection: %s", err)
}
}

}
}(factory)
}

tm.Logger.Info("Starting transport manager")
tm.Logger.Info("TransportManager is serving.")
wg.Wait()
return nil
}
Expand Down Expand Up @@ -343,7 +345,9 @@ func (tm *Manager) dialTransport(ctx context.Context, factory Factory, remote ci
return nil, nil, err
}

fmt.Printf(">>> INITIATING SETTLEMENT HANDSHAKE: local(%s) remote(%s)\n", tm.config.PubKey, remote)
entry, err := settlementInitiatorHandshake(public).Do(tm, tr, time.Minute)
fmt.Printf("<<< COMPLETED SETTLEMENT HANDSHAKE: local(%s) remote(%s) error: %v\n", tm.config.PubKey, remote, err)
if err != nil {
go func() {
if err := tr.Close(); err != nil {
Expand Down Expand Up @@ -371,15 +375,19 @@ func (tm *Manager) acceptTransport(ctx context.Context, factory Factory) (Transp
return nil, errors.New("failed to determine remote edge of accepted transport")
}

if isSetup := tm.IsSetupPK(remotePK); isSetup {
fmt.Printf("{tp.Manager} Factory found tp: local(%s) remote(%s)\n", tm.config.PubKey, remotePK)

if tm.IsSetupPK(remotePK) {
select {
case <-tm.doneChan:
return nil, io.ErrClosedPipe
case tm.SetupTpChan <- tr:
default:
tm.SetupTpChan <- tr
return tr, nil
}
}

// For transports for purpose(data)...
entry, err := settlementResponderHandshake().Do(tm, tr, 30*time.Second)
if err != nil {
go func() {
Expand Down

0 comments on commit 7b2f582

Please sign in to comment.