From c86c9e3e69c0951c30466b19ef68787c349fbc52 Mon Sep 17 00:00:00 2001 From: Evan Lin Date: Fri, 6 Sep 2019 22:34:03 +0800 Subject: [PATCH 1/8] Began work to fix bug where visor node restart does not reestablish routes. --- pkg/setup/config.go | 4 +- pkg/setup/node.go | 71 +++++++++++++++--------------- pkg/transport/managed_transport.go | 3 +- pkg/transport/manager.go | 21 --------- 4 files changed, 39 insertions(+), 60 deletions(-) diff --git a/pkg/setup/config.go b/pkg/setup/config.go index ccb4ddad30..e30becc02c 100644 --- a/pkg/setup/config.go +++ b/pkg/setup/config.go @@ -8,8 +8,8 @@ import ( // Various timeouts for setup node. const ( - ServeTransportTimeout = time.Second * 30 - ReadTimeout = time.Second * 10 + RequestTimeout = time.Second * 30 + ReadTimeout = time.Second * 10 ) // Config defines configuration parameters for setup Node. diff --git a/pkg/setup/node.go b/pkg/setup/node.go index 5577b2b2af..f37e0c1c9b 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "sync" "time" "github.com/google/uuid" @@ -78,15 +77,15 @@ func (sn *Node) Serve(ctx context.Context) error { return err } go func(conn *dmsg.Transport) { - if err := sn.serveTransport(ctx, conn); err != nil { + if err := sn.handleRequest(ctx, conn); err != nil { sn.Logger.Warnf("Failed to serve Transport: %s", err) } }(conn) } } -func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error { - ctx, cancel := context.WithTimeout(ctx, ServeTransportTimeout) +func (sn *Node) handleRequest(ctx context.Context, tr *dmsg.Transport) error { + ctx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() proto := NewSetupProtocol(tr) @@ -95,36 +94,44 @@ func (sn *Node) serveTransport(ctx context.Context, tr *dmsg.Transport) error { return err } - sn.Logger.Infof("Got new Setup request with type %s: %s", sp, string(data)) - defer sn.Logger.Infof("Completed Setup request with type %s: %s", sp, string(data)) + log := sn.Logger.WithField("requester", tr.RemotePK()).WithField("reqType", sp) + log.Infof("Received request.") startTime := time.Now() + switch sp { case PacketCreateLoop: var ld routing.LoopDescriptor - if err = json.Unmarshal(data, &ld); err == nil { - err = sn.createLoop(ctx, ld) + if err = json.Unmarshal(data, &ld); err != nil { + break } + ldJson, _ := json.MarshalIndent(ld, "", "\t") + log.Infof("CreateLoop loop descriptor: %s", string(ldJson)) + err = sn.createLoop(ctx, ld) + case PacketCloseLoop: var ld routing.LoopData - if err = json.Unmarshal(data, &ld); err == nil { - err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ - Loop: routing.Loop{ - Remote: ld.Loop.Local, - Local: ld.Loop.Remote, - }, - }) + if err = json.Unmarshal(data, &ld); err != nil { + break } + err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ + Loop: routing.Loop{ + Remote: ld.Loop.Local, + Local: ld.Loop.Remote, + }, + }) + default: err = errors.New("unknown foundation packet") } sn.metrics.Record(time.Since(startTime), err != nil) if err != nil { - sn.Logger.Infof("Setup request with type %s failed: %s", sp, err) + log.WithError(err).Warnf("Request completed with error.") return proto.WritePacket(RespFailure, err) } + log.Infof("Request completed successfully.") return proto.WritePacket(RespSuccess, nil) } @@ -215,13 +222,12 @@ func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error // // During the setup process each error received along the way causes all the procedure to be canceled. RouteID received // from the 1st step connecting to the initiating node is used as the ID for the overall rule, thus being returned. -func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, - rport, lport routing.Port) (routing.RouteID, error) { +func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, rPort, lPort routing.Port) (routing.RouteID, error) { if len(route) == 0 { return 0, nil } - sn.Logger.Infof("Creating new Route %s", route) + sn.Logger.Infof("Creating a new Route %s", route) // add the initiating node to the start of the route. We need to loop over all the visor nodes // along the route to apply rules including the initiating one @@ -251,7 +257,7 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route resultingRouteIDCh := make(chan routing.RouteID, 2) // context to cancel rule setup in case of errors - cancellableCtx, cancel := context.WithCancel(ctx) + ctx, cancel := context.WithCancel(ctx) for i := len(r) - 1; i >= 0; i-- { var reqIDChIn, reqIDChOut chan routing.RouteID // goroutine[0] doesn't need to pass the route ID from the 1st step to anyone @@ -268,12 +274,11 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route nextTpID = r[i+1].Transport rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0) } else { - rule = routing.AppRule(keepAlive, 0, 0, init, lport, rport) + rule = routing.AppRule(keepAlive, 0, 0, init, lPort, rPort) } - go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, - reqIDChOut chan<- routing.RouteID) { - routeID, err := sn.setupRule(cancellableCtx, pk, rule, reqIDChIn, reqIDChOut) + go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) { + routeID, err := sn.setupRule(ctx, pk, rule, reqIDChIn, reqIDChOut) // adding rule for initiator must result with a route ID for the overall route // it doesn't matter for now if there was an error, this result will be fetched only if there wasn't one if i == 0 { @@ -295,17 +300,16 @@ func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route } var rulesSetupErr error - var cancelOnce sync.Once // check for any errors occurred so far for range r { // filter out context cancellation errors if err := <-rulesSetupErrs; err != nil && err != context.Canceled { // rules setup failed, cancel further setup - cancelOnce.Do(cancel) + cancel() rulesSetupErr = err } } - cancelOnce.Do(cancel) + cancel() // close chan to avoid leaks close(rulesSetupErrs) @@ -349,11 +353,7 @@ func (sn *Node) Close() error { } func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error { - fmt.Printf(">>> BEGIN: closeLoop(%s, ld)\n", on) - defer fmt.Printf(">>> END: closeLoop(%s, ld)\n", on) - proto, err := sn.dialAndCreateProto(ctx, on) - fmt.Println(">>> *****: closeLoop() dialed:", err) if err != nil { return err } @@ -367,10 +367,9 @@ func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.Loop return nil } -func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, - reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) { - sn.Logger.Debugf("trying to setup setup rule: %v with %s\n", rule, pk) - requestRouteID, err := sn.requestRouteID(ctx, pk) +func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) { + sn.Logger.Debugf("trying to setup setup rule: %v with %s", rule, pk) + requestRouteID, err := sn.requestRouteID(ctx, pk) // take this. if err != nil { return 0, err } @@ -386,7 +385,7 @@ func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Ru rule.SetRequestRouteID(requestRouteID) - sn.Logger.Debugf("dialing to %s to setup rule: %v\n", pk, rule) + sn.Logger.Debugf("dialing to %s to setup rule: %v", pk, rule) if err := sn.addRule(ctx, pk, rule); err != nil { return 0, err diff --git a/pkg/transport/managed_transport.go b/pkg/transport/managed_transport.go index a4f7470e1e..a61869fdf1 100644 --- a/pkg/transport/managed_transport.go +++ b/pkg/transport/managed_transport.go @@ -108,6 +108,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru mt.connMx.Unlock() }() + // Read loop. go func() { defer func() { mt.log.Infof("closed readPacket loop.") @@ -133,6 +134,7 @@ func (mt *ManagedTransport) Serve(readCh chan<- routing.Packet, done <-chan stru } }() + // Redial loop. for { select { case <-mt.done: @@ -225,7 +227,6 @@ func (mt *ManagedTransport) Dial(ctx context.Context) error { return mt.dial(ctx) } -// TODO: Figure out where this fella is called. func (mt *ManagedTransport) dial(ctx context.Context) error { tp, err := mt.n.Dial(mt.netName, mt.rPK, snet.TransportPort) if err != nil { diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index 206aa407fd..ccc246207a 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -116,27 +116,6 @@ func (tm *Manager) serve(ctx context.Context) { } } -// TODO(nkryuchkov): either use or remove if unused -// func (tm *Manager) initTransports(ctx context.Context) { -// tm.mx.Lock() -// defer tm.mx.Unlock() -// -// entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) -// if err != nil { -// log.Warnf("No transports found for local node: %v", err) -// } -// for _, entry := range entries { -// var ( -// tpType = entry.Entry.Type -// remote = entry.Entry.RemoteEdge(tm.conf.PubKey) -// tpID = entry.Entry.ID -// ) -// if _, err := tm.saveTransport(remote, tpType); err != nil { -// tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) -// } -// } -// } - func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) error { conn, err := lis.AcceptConn() // TODO: tcp panic. if err != nil { From 461838e20e08fbff7aff552e353cdc09c18fc676 Mon Sep 17 00:00:00 2001 From: Evan Lin Date: Mon, 9 Sep 2019 01:52:59 +0800 Subject: [PATCH 2/8] Changed behaviour of setup. * Reserving route IDs and adding rules to visors is now split into two communication steps. * Improved readability and testability of the setup procedure but splitting responsibilities to additional structures; setup.idReservoir, setup.RulesMap * Improved logging for setup procedure. * Slightly tweaked setup.Protocol to accommodate aforementioned changes. --- pkg/router/route_manager.go | 20 +- pkg/router/route_manager_test.go | 12 +- pkg/routing/rule.go | 18 +- pkg/setup/idreservoir.go | 164 ++++++++ pkg/setup/node.go | 339 ++++------------ pkg/setup/node_test.go | 657 ++++++++++++++++--------------- pkg/setup/protocol.go | 29 +- pkg/visor/rpc_client.go | 1 + 8 files changed, 637 insertions(+), 603 deletions(-) create mode 100644 pkg/setup/idreservoir.go diff --git a/pkg/router/route_manager.go b/pkg/router/route_manager.go index 25faf40d0c..b1717b18a6 100644 --- a/pkg/router/route_manager.go +++ b/pkg/router/route_manager.go @@ -127,7 +127,7 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error { case setup.PacketLoopClosed: err = rm.loopClosed(body) case setup.PacketRequestRouteID: - respBody, err = rm.occupyRouteID() + respBody, err = rm.occupyRouteID(body) default: err = errors.New("unknown foundation packet") } @@ -312,12 +312,20 @@ func (rm *routeManager) loopClosed(data []byte) error { return rm.conf.OnLoopClosed(ld.Loop) } -func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) { - rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0) - routeID, err := rm.rt.AddRule(rule) - if err != nil { +func (rm *routeManager) occupyRouteID(data []byte) ([]routing.RouteID, error) { + var n uint8 + if err := json.Unmarshal(data, &n); err != nil { return nil, err } - return []routing.RouteID{routeID}, nil + var ids = make([]routing.RouteID, n) + for i := range ids { + rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0) + routeID, err := rm.rt.AddRule(rule) + if err != nil { + return nil, err + } + ids[i] = routeID + } + return ids, nil } diff --git a/pkg/router/route_manager_test.go b/pkg/router/route_manager_test.go index f40bc5e246..729e2c699b 100644 --- a/pkg/router/route_manager_test.go +++ b/pkg/router/route_manager_test.go @@ -114,26 +114,26 @@ func TestNewRouteManager(t *testing.T) { }() // Emulate SetupNode sending RequestRegistrationID request. - id, err := setup.RequestRouteID(context.TODO(), setup.NewSetupProtocol(requestIDIn)) + ids, err := setup.RequestRouteIDs(context.TODO(), setup.NewSetupProtocol(requestIDIn), 1) require.NoError(t, err) // Emulate SetupNode sending AddRule request. - rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), id) - err = setup.AddRule(context.TODO(), setup.NewSetupProtocol(addIn), rule) + rule := routing.ForwardRule(10*time.Minute, 3, uuid.New(), ids[0]) + err = setup.AddRules(context.TODO(), setup.NewSetupProtocol(addIn), []routing.Rule{rule}) require.NoError(t, err) // Check routing table state after AddRule. assert.Equal(t, 1, rt.Count()) - r, err := rt.Rule(id) + r, err := rt.Rule(ids[0]) require.NoError(t, err) assert.Equal(t, rule, r) // Emulate SetupNode sending RemoveRule request. - require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), id)) + require.NoError(t, setup.DeleteRule(context.TODO(), setup.NewSetupProtocol(delIn), ids[0])) // Check routing table state after DeleteRule. assert.Equal(t, 0, rt.Count()) - r, err = rt.Rule(id) + r, err = rt.Rule(ids[0]) assert.Error(t, err) assert.Nil(t, r) } diff --git a/pkg/routing/rule.go b/pkg/routing/rule.go index ba23bbc4cc..a47b9b89f2 100644 --- a/pkg/routing/rule.go +++ b/pkg/routing/rule.go @@ -114,14 +114,22 @@ func (r Rule) SetRequestRouteID(id RouteID) { } func (r Rule) String() string { - if r.Type() == RuleApp { - return fmt.Sprintf("App: ", - r.RouteID(), r.RemotePK(), r.RemotePort(), r.LocalPort()) + switch r.Type() { + case RuleApp: + return fmt.Sprintf("APP(keyRtID:%d, resRtID:%d, rPK:%s, rPort:%d, lPort:%d)", + r.RequestRouteID(), r.RouteID(), r.RemotePK(), r.RemotePort(), r.LocalPort()) + case RuleForward: + return fmt.Sprintf("FWD(keyRtID:%d, nxtRtID:%d, nxtTpID:%s)", + r.RequestRouteID(), r.RouteID(), r.TransportID()) + default: + return "invalid rule" } - - return fmt.Sprintf("Forward: ", r.RouteID(), r.TransportID()) } +//func (r Rule) MarshalJSON() ([]byte, error) { +// return json.Marshal(r.String()) +//} + // RuleAppFields summarizes App fields of a RoutingRule. type RuleAppFields struct { RespRID RouteID `json:"resp_rid"` diff --git a/pkg/setup/idreservoir.go b/pkg/setup/idreservoir.go new file mode 100644 index 0000000000..467192c6cd --- /dev/null +++ b/pkg/setup/idreservoir.go @@ -0,0 +1,164 @@ +package setup + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + "time" + + "github.com/skycoin/dmsg/cipher" + + "github.com/skycoin/skywire/pkg/routing" +) + +type idReservoir struct { + rec map[cipher.PubKey]uint8 + ids map[cipher.PubKey][]routing.RouteID + mx sync.Mutex +} + +func newIDReservoir(routes ...routing.Route) (*idReservoir, int) { + rec := make(map[cipher.PubKey]uint8) + var total int + + for _, rt := range routes { + if len(rt) == 0 { + continue + } + rec[rt[0].From]++ + for _, hop := range rt { + rec[hop.To]++ + } + total += len(rt) + 1 + } + + return &idReservoir{ + rec: rec, + ids: make(map[cipher.PubKey][]routing.RouteID), + }, total +} + +func (idr *idReservoir) ReserveIDs(ctx context.Context, reserve func(ctx context.Context, pk cipher.PubKey, n uint8) ([]routing.RouteID, error)) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + errCh := make(chan error, len(idr.rec)) + defer close(errCh) + + for pk, n := range idr.rec { + pk, n := pk, n + go func() { + ids, err := reserve(ctx, pk, n) + if err != nil { + errCh <- fmt.Errorf("reserve routeID from %s failed: %v", pk, err) + return + } + idr.mx.Lock() + idr.ids[pk] = ids + idr.mx.Unlock() + errCh <- nil + }() + } + + return finalError(len(idr.rec), errCh) +} + +func (idr *idReservoir) PopID(pk cipher.PubKey) (routing.RouteID, bool) { + idr.mx.Lock() + defer idr.mx.Unlock() + + ids, ok := idr.ids[pk] + if !ok || len(ids) == 0 { + return 0, false + } + + idr.ids[pk] = ids[1:] + return ids[0], true +} + +// RulesMap associates a slice of rules to a visor's public key. +type RulesMap map[cipher.PubKey][]routing.Rule + +func (rm RulesMap) String() string { + out := make(map[cipher.PubKey][]string, len(rm)) + for pk, rules := range rm { + str := make([]string, len(rules)) + for i, rule := range rules { + str[i] = rule.String() + } + out[pk] = str + } + jb, err := json.MarshalIndent(out, "", "\t") + if err != nil { + panic(err) + } + return string(jb) +} + +// GenerateRules generates rules for a given LoopDescriptor. +// The outputs are as follows: +// - rules: a map that relates a slice of routing rules to a given visor's public key. +// - srcAppRID: the initiating node's route ID that references the FWD rule. +// - dstAppRID: the responding node's route ID that references the FWD rule. +// - err: an error (if any). +func GenerateRules(idc *idReservoir, ld routing.LoopDescriptor) (rules RulesMap, srcFwdRID, dstFwdRID routing.RouteID, err error) { + rules = make(RulesMap) + src, dst := ld.Loop.Local, ld.Loop.Remote + + firstFwdRID, lastFwdRID, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Forward) + if err != nil { + return nil, 0, 0, err + } + firstRevRID, lastRevRID, err := SaveForwardRules(rules, idc, ld.KeepAlive, ld.Reverse) + if err != nil { + return nil, 0, 0, err + } + + rules[src.PubKey] = append(rules[src.PubKey], + routing.AppRule(ld.KeepAlive, firstRevRID, lastFwdRID, dst.PubKey, src.Port, dst.Port)) + rules[dst.PubKey] = append(rules[dst.PubKey], + routing.AppRule(ld.KeepAlive, firstFwdRID, lastRevRID, src.PubKey, dst.Port, src.Port)) + + return rules, firstFwdRID, firstRevRID, nil +} + +// SaveForwardRules creates the rules of the given route, and saves them in the 'rules' input. +// Note that the last rule for the route is always an APP rule, and so is not created here. +// The outputs are as follows: +// - firstRID: the first visor's route ID. +// - lastRID: the last visor's route ID (note that there is no rule set for this ID yet). +// - err: an error (if any). +func SaveForwardRules(rules RulesMap, idc *idReservoir, keepAlive time.Duration, route routing.Route) (firstRID, lastRID routing.RouteID, err error) { + + // 'firstRID' is the first visor's key routeID - this is to be returned. + var ok bool + if firstRID, ok = idc.PopID(route[0].From); !ok { + return 0, 0, errors.New("fucked up") + } + + var rID = firstRID + for _, hop := range route { + nxtRID, ok := idc.PopID(hop.To) + if !ok { + return 0, 0, errors.New("fucked up") + } + rule := routing.ForwardRule(keepAlive, nxtRID, hop.Transport, rID) + rules[hop.From] = append(rules[hop.From], rule) + + rID = nxtRID + } + + return firstRID, rID, nil +} + +func finalError(n int, errCh <-chan error) error { + var finalErr error + for i := 0; i < n; i++ { + if err := <-errCh; err != nil { + finalErr = err + } + } + return finalErr +} diff --git a/pkg/setup/node.go b/pkg/setup/node.go index f37e0c1c9b..c522a73297 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -7,8 +7,6 @@ import ( "fmt" "time" - "github.com/google/uuid" - "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/dmsg" @@ -67,6 +65,14 @@ func NewNode(conf *Config, metrics metrics.Recorder) (*Node, error) { }, nil } +// Close closes underlying dmsg client. +func (sn *Node) Close() error { + if sn == nil { + return nil + } + return sn.dmsgC.Close() +} + // Serve starts transport listening loop. func (sn *Node) Serve(ctx context.Context) error { sn.Logger.Info("serving setup node") @@ -105,16 +111,19 @@ func (sn *Node) handleRequest(ctx context.Context, tr *dmsg.Transport) error { if err = json.Unmarshal(data, &ld); err != nil { break } - ldJson, _ := json.MarshalIndent(ld, "", "\t") - log.Infof("CreateLoop loop descriptor: %s", string(ldJson)) - err = sn.createLoop(ctx, ld) + ldJSON, jErr := json.MarshalIndent(ld, "", "\t") + if jErr != nil { + panic(jErr) + } + log.Infof("CreateLoop loop descriptor: %s", string(ldJSON)) + err = sn.handleCreateLoop(ctx, ld) case PacketCloseLoop: var ld routing.LoopData if err = json.Unmarshal(data, &ld); err != nil { break } - err = sn.closeLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ + err = sn.handleCloseLoop(ctx, ld.Loop.Remote.PubKey, routing.LoopData{ Loop: routing.Loop{ Remote: ld.Loop.Local, Local: ld.Loop.Remote, @@ -135,224 +144,107 @@ func (sn *Node) handleRequest(ctx context.Context, tr *dmsg.Transport) error { return proto.WritePacket(RespSuccess, nil) } -func (sn *Node) createLoop(ctx context.Context, ld routing.LoopDescriptor) error { - sn.Logger.Infof("Creating new Loop %s", ld) - rRouteID, err := sn.createRoute(ctx, ld.KeepAlive, ld.Reverse, ld.Loop.Local.Port, ld.Loop.Remote.Port) +func (sn *Node) handleCreateLoop(ctx context.Context, ld routing.LoopDescriptor) error { + src := ld.Loop.Local + dst := ld.Loop.Remote + + // Reserve route IDs from visors. + idr, err := sn.reserveRouteIDs(ctx, ld.Forward, ld.Reverse) if err != nil { return err } - fRouteID, err := sn.createRoute(ctx, ld.KeepAlive, ld.Forward, ld.Loop.Remote.Port, ld.Loop.Local.Port) + // Determine the rules to send to visors using loop descriptor and reserved route IDs. + rulesMap, srcFwdRID, dstFwdRID, err := GenerateRules(idr, ld) if err != nil { return err } + sn.Logger.Infof("generated rules: %v", rulesMap) - if len(ld.Forward) == 0 || len(ld.Reverse) == 0 { - return nil - } - - initiator := ld.Initiator() - responder := ld.Responder() - - ldR := routing.LoopData{ - Loop: routing.Loop{ - Remote: routing.Addr{ - PubKey: initiator, - Port: ld.Loop.Local.Port, - }, - Local: routing.Addr{ - PubKey: responder, - Port: ld.Loop.Remote.Port, - }, - }, - RouteID: rRouteID, - } - if err := sn.connectLoop(ctx, responder, ldR); err != nil { - sn.Logger.Warnf("Failed to confirm loop with responder: %s", err) - return fmt.Errorf("loop connect: %s", err) - } - - ldI := routing.LoopData{ - Loop: routing.Loop{ - Remote: routing.Addr{ - PubKey: responder, - Port: ld.Loop.Remote.Port, - }, - Local: routing.Addr{ - PubKey: initiator, - Port: ld.Loop.Local.Port, - }, - }, - RouteID: fRouteID, - } - if err := sn.connectLoop(ctx, initiator, ldI); err != nil { - sn.Logger.Warnf("Failed to confirm loop with initiator: %s", err) - if err := sn.closeLoop(ctx, responder, ldR); err != nil { - sn.Logger.Warnf("Failed to close loop: %s", err) - } - return fmt.Errorf("loop connect: %s", err) - } - - sn.Logger.Infof("Created Loop %s", ld) - return nil -} - -// createRoute setups the route. Route setup involves applying routing rules to each visor node along the route. -// Each rule applying procedure consists of two steps: -// 1. Request free route ID from the visor node -// 2. Apply the rule, using route ID from the step 1 to register this rule inside the visor node -// -// Route ID received as a response after 1st step is used in two rules. 1st, it's used in the rule being applied -// to the current visor node as a route ID to register this rule within the visor node. -// 2nd, it's used in the rule being applied to the previous visor node along the route as a `respRouteID/nextRouteID`. -// For this reason, each 2nd step must wait for completion of its 1st step and the 1st step of the next visor node -// along the route to be able to obtain route ID from there. IDs serving as `respRouteID/nextRouteID` are being -// passed in a fan-like fashion. -// -// Example. Let's say, we have N visor nodes along the route. Visor[0] is the initiator. Setup node sends N requests to -// each visor along the route according to the 1st step and gets N route IDs in response. Then we assemble N rules to -// be applied. We construct each rule as the following: -// - Rule[0..N-1] are of type `ForwardRule`; -// - Rule[N] is of type `AppRule`; -// - For i = 0..N-1 rule[i] takes `nextTransportID` from the rule[i+1]; -// - For i = 0..N-1 rule[i] takes `respRouteID/nextRouteID` from rule[i+1] (after [i+1] request for free route ID -// completes; -// - Rule[N] has `respRouteID/nextRouteID` equal to 0; -// Rule[0..N] use their route ID retrieved from the 1st step to be registered within the corresponding visor node. -// -// During the setup process each error received along the way causes all the procedure to be canceled. RouteID received -// from the 1st step connecting to the initiating node is used as the ID for the overall rule, thus being returned. -func (sn *Node) createRoute(ctx context.Context, keepAlive time.Duration, route routing.Route, rPort, lPort routing.Port) (routing.RouteID, error) { - if len(route) == 0 { - return 0, nil - } - - sn.Logger.Infof("Creating a new Route %s", route) - - // add the initiating node to the start of the route. We need to loop over all the visor nodes - // along the route to apply rules including the initiating one - r := make(routing.Route, len(route)+1) - r[0] = &routing.Hop{ - Transport: route[0].Transport, - To: route[0].From, - } - copy(r[1:], route) - - init := route[0].From - - // indicate errors occurred during rules setup - rulesSetupErrs := make(chan error, len(r)) - // reqIDsCh is an array of chans used to pass the requested route IDs around the goroutines. - // We do it in a fan fashion here. We create as many goroutines as there are rules to be applied. - // Goroutine[i] requests visor node for a free route ID. It passes this route ID through a chan to - // a goroutine[i-1]. In turn, goroutine[i-1] waits for a route ID from chan[i]. - // Thus, goroutine[len(r)] doesn't get a route ID and uses 0 instead, goroutine[0] doesn't pass - // its route ID to anyone - reqIDsCh := make([]chan routing.RouteID, 0, len(r)) - for range r { - reqIDsCh = append(reqIDsCh, make(chan routing.RouteID, 2)) - } - - // chan to receive the resulting route ID from a goroutine - resultingRouteIDCh := make(chan routing.RouteID, 2) - - // context to cancel rule setup in case of errors - ctx, cancel := context.WithCancel(ctx) - for i := len(r) - 1; i >= 0; i-- { - var reqIDChIn, reqIDChOut chan routing.RouteID - // goroutine[0] doesn't need to pass the route ID from the 1st step to anyone - if i > 0 { - reqIDChOut = reqIDsCh[i-1] - } - var ( - nextTpID uuid.UUID - rule routing.Rule - ) - // goroutine[len(r)-1] uses 0 as the route ID from the 1st step - if i != len(r)-1 { - reqIDChIn = reqIDsCh[i] - nextTpID = r[i+1].Transport - rule = routing.ForwardRule(keepAlive, 0, nextTpID, 0) - } else { - rule = routing.AppRule(keepAlive, 0, 0, init, lPort, rPort) - } + // Add rules to visors. + errCh := make(chan error, len(rulesMap)) + defer close(errCh) + for pk, rules := range rulesMap { + pk, rules := pk, rules + go func() { + log := sn.Logger.WithField("remote", pk) - go func(i int, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) { - routeID, err := sn.setupRule(ctx, pk, rule, reqIDChIn, reqIDChOut) - // adding rule for initiator must result with a route ID for the overall route - // it doesn't matter for now if there was an error, this result will be fetched only if there wasn't one - if i == 0 { - resultingRouteIDCh <- routeID - } + proto, err := sn.dialAndCreateProto(ctx, pk) if err != nil { - // filter out context cancellation errors - if err == context.Canceled { - rulesSetupErrs <- err - } else { - rulesSetupErrs <- fmt.Errorf("rule setup: %s", err) - } - + log.WithError(err).Warn("failed to create proto") + errCh <- err return } + defer sn.closeProto(proto) + log.Debug("proto created successfully") - rulesSetupErrs <- nil - }(i, r[i].To, rule, reqIDChIn, reqIDChOut) + if err := AddRules(ctx, proto, rules); err != nil { + log.WithError(err).Warn("failed to add rules") + errCh <- err + return + } + log.Debug("rules added") + errCh <- nil + }() + } + if err := finalError(len(rulesMap), errCh); err != nil { + return err } - var rulesSetupErr error - // check for any errors occurred so far - for range r { - // filter out context cancellation errors - if err := <-rulesSetupErrs; err != nil && err != context.Canceled { - // rules setup failed, cancel further setup - cancel() - rulesSetupErr = err + // Confirm loop with responding visor. + err = func() error { + proto, err := sn.dialAndCreateProto(ctx, dst.PubKey) + if err != nil { + return err } - } - cancel() + defer sn.closeProto(proto) - // close chan to avoid leaks - close(rulesSetupErrs) - for _, ch := range reqIDsCh { - close(ch) - } - if rulesSetupErr != nil { - return 0, rulesSetupErr + data := routing.LoopData{Loop: routing.Loop{Local: dst, Remote: src}, RouteID: dstFwdRID} + return ConfirmLoop(ctx, proto, data) + }() + if err != nil { + return fmt.Errorf("failed to confirm loop with destination visor: %v", err) } - // value gets passed to the chan only if no errors occurred during the route establishment - // errors are being filtered above, so at the moment we get to this part, the value is - // guaranteed to be in the channel - routeID := <-resultingRouteIDCh - close(resultingRouteIDCh) - - return routeID, nil -} + // Confirm loop with initiating visor. + err = func() error { + proto, err := sn.dialAndCreateProto(ctx, src.PubKey) + if err != nil { + return err + } + defer sn.closeProto(proto) -func (sn *Node) connectLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error { - proto, err := sn.dialAndCreateProto(ctx, on) + data := routing.LoopData{Loop: routing.Loop{Local: src, Remote: dst}, RouteID: srcFwdRID} + return ConfirmLoop(ctx, proto, data) + }() if err != nil { - return err + return fmt.Errorf("failed to confirm loop with destination visor: %v", err) } - defer sn.closeProto(proto) - if err := ConfirmLoop(ctx, proto, ld); err != nil { - return err - } - - sn.Logger.Infof("Confirmed loop on %s with %s. RemotePort: %d. LocalPort: %d", on, ld.Loop.Remote.PubKey, ld.Loop.Remote.Port, ld.Loop.Local.Port) return nil } -// Close closes underlying dmsg client. -func (sn *Node) Close() error { - if sn == nil { - return nil +func (sn *Node) reserveRouteIDs(ctx context.Context, fwd, rev routing.Route) (*idReservoir, error) { + idc, total := newIDReservoir(fwd, rev) + sn.Logger.Infof("There are %d route IDs to reserve.", total) + + err := idc.ReserveIDs(ctx, func(ctx context.Context, pk cipher.PubKey, n uint8) ([]routing.RouteID, error) { + proto, err := sn.dialAndCreateProto(ctx, pk) + if err != nil { + return nil, err + } + defer sn.closeProto(proto) + return RequestRouteIDs(ctx, proto, n) + }) + if err != nil { + sn.Logger.WithError(err).Warnf("Failed to reserve route IDs.") + return nil, err } - return sn.dmsgC.Close() + sn.Logger.Infof("Successfully reserved route IDs.") + return idc, err } -func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error { +func (sn *Node) handleCloseLoop(ctx context.Context, on cipher.PubKey, ld routing.LoopData) error { proto, err := sn.dialAndCreateProto(ctx, on) if err != nil { return err @@ -367,64 +259,7 @@ func (sn *Node) closeLoop(ctx context.Context, on cipher.PubKey, ld routing.Loop return nil } -func (sn *Node) setupRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule, reqIDChIn <-chan routing.RouteID, reqIDChOut chan<- routing.RouteID) (routing.RouteID, error) { - sn.Logger.Debugf("trying to setup setup rule: %v with %s", rule, pk) - requestRouteID, err := sn.requestRouteID(ctx, pk) // take this. - if err != nil { - return 0, err - } - - if reqIDChOut != nil { - reqIDChOut <- requestRouteID - } - var nextRouteID routing.RouteID - if reqIDChIn != nil { - nextRouteID = <-reqIDChIn - rule.SetRouteID(nextRouteID) - } - - rule.SetRequestRouteID(requestRouteID) - - sn.Logger.Debugf("dialing to %s to setup rule: %v", pk, rule) - - if err := sn.addRule(ctx, pk, rule); err != nil { - return 0, err - } - - sn.Logger.Infof("Set rule of type %s on %s", rule.Type(), pk) - - return requestRouteID, nil -} - -func (sn *Node) requestRouteID(ctx context.Context, pk cipher.PubKey) (routing.RouteID, error) { - proto, err := sn.dialAndCreateProto(ctx, pk) - if err != nil { - return 0, err - } - defer sn.closeProto(proto) - - requestRouteID, err := RequestRouteID(ctx, proto) - if err != nil { - return 0, err - } - - sn.Logger.Infof("Received route ID %d from %s", requestRouteID, pk) - - return requestRouteID, nil -} - -func (sn *Node) addRule(ctx context.Context, pk cipher.PubKey, rule routing.Rule) error { - proto, err := sn.dialAndCreateProto(ctx, pk) - if err != nil { - return err - } - defer sn.closeProto(proto) - - return AddRule(ctx, proto, rule) -} - func (sn *Node) dialAndCreateProto(ctx context.Context, pk cipher.PubKey) (*Protocol, error) { - sn.Logger.Debugf("dialing to %s\n", pk) tr, err := sn.dmsgC.Dial(ctx, pk, snet.AwaitSetupPort) if err != nil { return nil, fmt.Errorf("transport: %s", err) diff --git a/pkg/setup/node_test.go b/pkg/setup/node_test.go index 78b5567fc0..7f765e2e3d 100644 --- a/pkg/setup/node_test.go +++ b/pkg/setup/node_test.go @@ -3,9 +3,24 @@ package setup import ( + "context" + "encoding/json" + "errors" + "fmt" "log" "os" "testing" + "time" + + "github.com/skycoin/dmsg" + "github.com/skycoin/dmsg/cipher" + "github.com/skycoin/dmsg/disc" + "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" + + "github.com/skycoin/skywire/pkg/metrics" + "github.com/skycoin/skywire/pkg/routing" + "github.com/skycoin/skywire/pkg/snet" "github.com/skycoin/skycoin/src/util/logging" ) @@ -40,324 +55,324 @@ func TestMain(m *testing.M) { // 3. Hanging may be not the problem of the DMSG. Probably some of the communication part here is wrong. // The reason I think so is that - if we ensure read timeouts, why doesn't this test constantly fail? // Maybe some wrapper for DMSG is wrong, or some internal operations before the actual communication behave bad -//func TestNode(t *testing.T) { -// // Prepare mock dmsg discovery. -// discovery := disc.NewMock() -// -// // Prepare dmsg server. -// server, serverErr := createServer(t, discovery) -// defer func() { -// require.NoError(t, server.Close()) -// require.NoError(t, errWithTimeout(serverErr)) -// }() -// -// type clientWithDMSGAddrAndListener struct { -// *dmsg.Client -// Addr dmsg.Addr -// Listener *dmsg.Listener -// } -// -// // CLOSURE: sets up dmsg clients. -// prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { -// clients := make([]clientWithDMSGAddrAndListener, n) -// for i := 0; i < n; i++ { -// var port uint16 -// // setup node -// if i == 0 { -// port = snet.SetupPort -// } else { -// port = snet.AwaitSetupPort -// } -// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) -// require.NoError(t, err) -// t.Logf("client[%d] PK: %s\n", i, pk) -// c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) -// require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) -// listener, err := c.Listen(port) -// require.NoError(t, err) -// clients[i] = clientWithDMSGAddrAndListener{ -// Client: c, -// Addr: dmsg.Addr{ -// PK: pk, -// Port: port, -// }, -// Listener: listener, -// } -// } -// return clients, func() { -// for _, c := range clients { -// //require.NoError(t, c.Listener.Close()) -// require.NoError(t, c.Close()) -// } -// } -// } -// -// // CLOSURE: sets up setup node. -// prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { -// sn := &Node{ -// Logger: logging.MustGetLogger("setup_node"), -// dmsgC: c, -// dmsgL: listener, -// metrics: metrics.NewDummy(), -// } -// go func() { -// if err := sn.Serve(context.TODO()); err != nil { -// sn.Logger.WithError(err).Error("Failed to serve") -// } -// }() -// return sn, func() { -// require.NoError(t, sn.Close()) -// } -// } -// -// // TEST: Emulates the communication between 4 visor nodes and a setup node, -// // where the first client node initiates a loop to the last. -// t.Run("CreateLoop", func(t *testing.T) { -// // client index 0 is for setup node. -// // clients index 1 to 4 are for visor nodes. -// clients, closeClients := prepClients(5) -// defer closeClients() -// -// // prepare and serve setup node (using client 0). -// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) -// setupPK := clients[0].Addr.PK -// setupPort := clients[0].Addr.Port -// defer closeSetup() -// -// // prepare loop creation (client_1 will use this to request loop creation with setup node). -// ld := routing.LoopDescriptor{ -// Loop: routing.Loop{ -// Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, -// Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, -// }, -// Reverse: routing.Route{ -// &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, -// }, -// Forward: routing.Route{ -// &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, -// &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, -// }, -// KeepAlive: 1 * time.Hour, -// } -// -// // client_1 initiates loop creation with setup node. -// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) -// require.NoError(t, err) -// iTpErrs := make(chan error, 2) -// go func() { -// iTpErrs <- CreateLoop(context.TODO(), NewSetupProtocol(iTp), ld) -// iTpErrs <- iTp.Close() -// close(iTpErrs) -// }() -// defer func() { -// i := 0 -// for err := range iTpErrs { -// require.NoError(t, err, i) -// i++ -// } -// }() -// -// var addRuleDone sync.WaitGroup -// var nextRouteID uint32 -// // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. -// expectAddRules := func(client int, expRule routing.RuleType) { -// conn, err := clients[client].Listener.Accept() -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) -// -// proto := NewSetupProtocol(conn) -// -// pt, _, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketRequestRouteID, pt) -// -// fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) -// -// routeID := atomic.AddUint32(&nextRouteID, 1) -// -// // TODO: This error is not checked due to a bug in dmsg. -// _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) -// -// require.NoError(t, conn.Close()) -// -// conn, err = clients[client].Listener.Accept() -// require.NoError(t, err) -// -// fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) -// -// proto = NewSetupProtocol(conn) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketAddRules, pt) -// -// fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) -// -// var rs []routing.Rule -// require.NoError(t, json.Unmarshal(pp, &rs)) -// -// for _, r := range rs { -// require.Equal(t, expRule, r.Type()) -// } -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// -// fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) -// -// require.NoError(t, conn.Close()) -// -// addRuleDone.Done() -// } -// -// // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. -// expectConfirmLoop := func(client int) { -// tp, err := clients[client].Listener.AcceptTransport() -// require.NoError(t, err) -// -// proto := NewSetupProtocol(tp) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketConfirmLoop, pt) -// -// var d routing.LoopData -// require.NoError(t, json.Unmarshal(pp, &d)) -// -// switch client { -// case 1: -// require.Equal(t, ld.Loop, d.Loop) -// case 4: -// require.Equal(t, ld.Loop.Local, d.Loop.Remote) -// require.Equal(t, ld.Loop.Remote, d.Loop.Local) -// default: -// t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) -// } -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// -// require.NoError(t, tp.Close()) -// } -// -// // since the route establishment is asynchronous, -// // we must expect all the messages in parallel -// addRuleDone.Add(4) -// go expectAddRules(4, routing.RuleApp) -// go expectAddRules(3, routing.RuleForward) -// go expectAddRules(2, routing.RuleForward) -// go expectAddRules(1, routing.RuleForward) -// addRuleDone.Wait() -// fmt.Println("FORWARD ROUTE DONE") -// addRuleDone.Add(4) -// go expectAddRules(1, routing.RuleApp) -// go expectAddRules(2, routing.RuleForward) -// go expectAddRules(3, routing.RuleForward) -// go expectAddRules(4, routing.RuleForward) -// addRuleDone.Wait() -// fmt.Println("REVERSE ROUTE DONE") -// expectConfirmLoop(1) -// expectConfirmLoop(4) -// }) -// -// // TEST: Emulates the communication between 2 visor nodes and a setup nodes, -// // where a route is already established, -// // and the first client attempts to tear it down. -// t.Run("CloseLoop", func(t *testing.T) { -// // client index 0 is for setup node. -// // clients index 1 and 2 are for visor nodes. -// clients, closeClients := prepClients(3) -// defer closeClients() -// -// // prepare and serve setup node. -// _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) -// setupPK := clients[0].Addr.PK -// setupPort := clients[0].Addr.Port -// defer closeSetup() -// -// // prepare loop data describing the loop that is to be closed. -// ld := routing.LoopData{ -// Loop: routing.Loop{ -// Local: routing.Addr{ -// PubKey: clients[1].Addr.PK, -// Port: 1, -// }, -// Remote: routing.Addr{ -// PubKey: clients[2].Addr.PK, -// Port: 2, -// }, -// }, -// RouteID: 3, -// } -// -// // client_1 initiates close loop with setup node. -// iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) -// require.NoError(t, err) -// iTpErrs := make(chan error, 2) -// go func() { -// iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) -// iTpErrs <- iTp.Close() -// close(iTpErrs) -// }() -// defer func() { -// i := 0 -// for err := range iTpErrs { -// require.NoError(t, err, i) -// i++ -// } -// }() -// -// // client_2 accepts close request. -// tp, err := clients[2].Listener.AcceptTransport() -// require.NoError(t, err) -// defer func() { require.NoError(t, tp.Close()) }() -// -// proto := NewSetupProtocol(tp) -// -// pt, pp, err := proto.ReadPacket() -// require.NoError(t, err) -// require.Equal(t, PacketLoopClosed, pt) -// -// var d routing.LoopData -// require.NoError(t, json.Unmarshal(pp, &d)) -// require.Equal(t, ld.Loop.Remote, d.Loop.Local) -// require.Equal(t, ld.Loop.Local, d.Loop.Remote) -// -// // TODO: This error is not checked due to a bug in dmsg. -// err = proto.WritePacket(RespSuccess, nil) -// _ = err -// }) -//} -// -//func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { -// pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) -// require.NoError(t, err) -// l, err := nettest.NewLocalListener("tcp") -// require.NoError(t, err) -// srv, err = dmsg.NewServer(pk, sk, "", l, dc) -// require.NoError(t, err) -// errCh := make(chan error, 1) -// go func() { -// errCh <- srv.Serve() -// close(errCh) -// }() -// return srv, errCh -//} -// -//func errWithTimeout(ch <-chan error) error { -// select { -// case err := <-ch: -// return err -// case <-time.After(5 * time.Second): -// return errors.New("timeout") -// } -//} +func TestNode(t *testing.T) { + // Prepare mock dmsg discovery. + discovery := disc.NewMock() + + // Prepare dmsg server. + server, serverErr := createServer(t, discovery) + defer func() { + require.NoError(t, server.Close()) + require.NoError(t, errWithTimeout(serverErr)) + }() + + type clientWithDMSGAddrAndListener struct { + *dmsg.Client + Addr dmsg.Addr + Listener *dmsg.Listener + } + + // CLOSURE: sets up dmsg clients. + prepClients := func(n int) ([]clientWithDMSGAddrAndListener, func()) { + clients := make([]clientWithDMSGAddrAndListener, n) + for i := 0; i < n; i++ { + var port uint16 + // setup node + if i == 0 { + port = snet.SetupPort + } else { + port = snet.AwaitSetupPort + } + pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte{byte(i)}) + require.NoError(t, err) + t.Logf("client[%d] PK: %s\n", i, pk) + c := dmsg.NewClient(pk, sk, discovery, dmsg.SetLogger(logging.MustGetLogger(fmt.Sprintf("client_%d:%s:%d", i, pk, port)))) + require.NoError(t, c.InitiateServerConnections(context.TODO(), 1)) + listener, err := c.Listen(port) + require.NoError(t, err) + clients[i] = clientWithDMSGAddrAndListener{ + Client: c, + Addr: dmsg.Addr{ + PK: pk, + Port: port, + }, + Listener: listener, + } + } + return clients, func() { + for _, c := range clients { + //require.NoError(t, c.Listener.Close()) + require.NoError(t, c.Close()) + } + } + } + + // CLOSURE: sets up setup node. + prepSetupNode := func(c *dmsg.Client, listener *dmsg.Listener) (*Node, func()) { + sn := &Node{ + Logger: logging.MustGetLogger("setup_node"), + dmsgC: c, + dmsgL: listener, + metrics: metrics.NewDummy(), + } + go func() { + if err := sn.Serve(context.TODO()); err != nil { + sn.Logger.WithError(err).Error("Failed to serve") + } + }() + return sn, func() { + require.NoError(t, sn.Close()) + } + } + + //// TEST: Emulates the communication between 4 visor nodes and a setup node, + //// where the first client node initiates a loop to the last. + //t.Run("CreateLoop", func(t *testing.T) { + // // client index 0 is for setup node. + // // clients index 1 to 4 are for visor nodes. + // clients, closeClients := prepClients(5) + // defer closeClients() + // + // // prepare and serve setup node (using client 0). + // _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) + // setupPK := clients[0].Addr.PK + // setupPort := clients[0].Addr.Port + // defer closeSetup() + // + // // prepare loop creation (client_1 will use this to request loop creation with setup node). + // ld := routing.LoopDescriptor{ + // Loop: routing.Loop{ + // Local: routing.Addr{PubKey: clients[1].Addr.PK, Port: 1}, + // Remote: routing.Addr{PubKey: clients[4].Addr.PK, Port: 1}, + // }, + // Reverse: routing.Route{ + // &routing.Hop{From: clients[1].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, + // &routing.Hop{From: clients[2].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, + // &routing.Hop{From: clients[3].Addr.PK, To: clients[4].Addr.PK, Transport: uuid.New()}, + // }, + // Forward: routing.Route{ + // &routing.Hop{From: clients[4].Addr.PK, To: clients[3].Addr.PK, Transport: uuid.New()}, + // &routing.Hop{From: clients[3].Addr.PK, To: clients[2].Addr.PK, Transport: uuid.New()}, + // &routing.Hop{From: clients[2].Addr.PK, To: clients[1].Addr.PK, Transport: uuid.New()}, + // }, + // KeepAlive: 1 * time.Hour, + // } + // + // // client_1 initiates loop creation with setup node. + // iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) + // require.NoError(t, err) + // iTpErrs := make(chan error, 2) + // go func() { + // iTpErrs <- CreateLoop(context.TODO(), NewSetupProtocol(iTp), ld) + // iTpErrs <- iTp.Close() + // close(iTpErrs) + // }() + // defer func() { + // i := 0 + // for err := range iTpErrs { + // require.NoError(t, err, i) + // i++ + // } + // }() + // + // var addRuleDone sync.WaitGroup + // var nextRouteID uint32 + // // CLOSURE: emulates how a visor node should react when expecting an AddRules packet. + // expectAddRules := func(client int, expRule routing.RuleType) { + // conn, err := clients[client].Listener.Accept() + // require.NoError(t, err) + // + // fmt.Printf("client %v:%v accepted\n", client, clients[client].Addr) + // + // proto := NewSetupProtocol(conn) + // + // pt, _, err := proto.ReadPacket() + // require.NoError(t, err) + // require.Equal(t, PacketRequestRouteID, pt) + // + // fmt.Printf("client %v:%v got PacketRequestRouteID\n", client, clients[client].Addr) + // + // routeID := atomic.AddUint32(&nextRouteID, 1) + // + // // TODO: This error is not checked due to a bug in dmsg. + // _ = proto.WritePacket(RespSuccess, []routing.RouteID{routing.RouteID(routeID)}) // nolint:errcheck + // require.NoError(t, err) + // + // fmt.Printf("client %v:%v responded to with registration ID: %v\n", client, clients[client].Addr, routeID) + // + // require.NoError(t, conn.Close()) + // + // conn, err = clients[client].Listener.Accept() + // require.NoError(t, err) + // + // fmt.Printf("client %v:%v accepted 2nd time\n", client, clients[client].Addr) + // + // proto = NewSetupProtocol(conn) + // + // pt, pp, err := proto.ReadPacket() + // require.NoError(t, err) + // require.Equal(t, PacketAddRules, pt) + // + // fmt.Printf("client %v:%v got PacketAddRules\n", client, clients[client].Addr) + // + // var rs []routing.Rule + // require.NoError(t, json.Unmarshal(pp, &rs)) + // + // for _, r := range rs { + // require.Equal(t, expRule, r.Type()) + // } + // + // // TODO: This error is not checked due to a bug in dmsg. + // err = proto.WritePacket(RespSuccess, nil) + // _ = err + // + // fmt.Printf("client %v:%v responded for PacketAddRules\n", client, clients[client].Addr) + // + // require.NoError(t, conn.Close()) + // + // addRuleDone.Done() + // } + // + // // CLOSURE: emulates how a visor node should react when expecting an OnConfirmLoop packet. + // expectConfirmLoop := func(client int) { + // tp, err := clients[client].Listener.AcceptTransport() + // require.NoError(t, err) + // + // proto := NewSetupProtocol(tp) + // + // pt, pp, err := proto.ReadPacket() + // require.NoError(t, err) + // require.Equal(t, PacketConfirmLoop, pt) + // + // var d routing.LoopData + // require.NoError(t, json.Unmarshal(pp, &d)) + // + // switch client { + // case 1: + // require.Equal(t, ld.Loop, d.Loop) + // case 4: + // require.Equal(t, ld.Loop.Local, d.Loop.Remote) + // require.Equal(t, ld.Loop.Remote, d.Loop.Local) + // default: + // t.Fatalf("We shouldn't be receiving a OnConfirmLoop packet from client %d", client) + // } + // + // // TODO: This error is not checked due to a bug in dmsg. + // err = proto.WritePacket(RespSuccess, nil) + // _ = err + // + // require.NoError(t, tp.Close()) + // } + // + // // since the route establishment is asynchronous, + // // we must expect all the messages in parallel + // addRuleDone.Add(4) + // go expectAddRules(4, routing.RuleApp) + // go expectAddRules(3, routing.RuleForward) + // go expectAddRules(2, routing.RuleForward) + // go expectAddRules(1, routing.RuleForward) + // addRuleDone.Wait() + // fmt.Println("FORWARD ROUTE DONE") + // addRuleDone.Add(4) + // go expectAddRules(1, routing.RuleApp) + // go expectAddRules(2, routing.RuleForward) + // go expectAddRules(3, routing.RuleForward) + // go expectAddRules(4, routing.RuleForward) + // addRuleDone.Wait() + // fmt.Println("REVERSE ROUTE DONE") + // expectConfirmLoop(1) + // expectConfirmLoop(4) + //}) + + // TEST: Emulates the communication between 2 visor nodes and a setup nodes, + // where a route is already established, + // and the first client attempts to tear it down. + t.Run("CloseLoop", func(t *testing.T) { + // client index 0 is for setup node. + // clients index 1 and 2 are for visor nodes. + clients, closeClients := prepClients(3) + defer closeClients() + + // prepare and serve setup node. + _, closeSetup := prepSetupNode(clients[0].Client, clients[0].Listener) + setupPK := clients[0].Addr.PK + setupPort := clients[0].Addr.Port + defer closeSetup() + + // prepare loop data describing the loop that is to be closed. + ld := routing.LoopData{ + Loop: routing.Loop{ + Local: routing.Addr{ + PubKey: clients[1].Addr.PK, + Port: 1, + }, + Remote: routing.Addr{ + PubKey: clients[2].Addr.PK, + Port: 2, + }, + }, + RouteID: 3, + } + + // client_1 initiates close loop with setup node. + iTp, err := clients[1].Dial(context.TODO(), setupPK, setupPort) + require.NoError(t, err) + iTpErrs := make(chan error, 2) + go func() { + iTpErrs <- CloseLoop(context.TODO(), NewSetupProtocol(iTp), ld) + iTpErrs <- iTp.Close() + close(iTpErrs) + }() + defer func() { + i := 0 + for err := range iTpErrs { + require.NoError(t, err, i) + i++ + } + }() + + // client_2 accepts close request. + tp, err := clients[2].Listener.AcceptTransport() + require.NoError(t, err) + defer func() { require.NoError(t, tp.Close()) }() + + proto := NewSetupProtocol(tp) + + pt, pp, err := proto.ReadPacket() + require.NoError(t, err) + require.Equal(t, PacketLoopClosed, pt) + + var d routing.LoopData + require.NoError(t, json.Unmarshal(pp, &d)) + require.Equal(t, ld.Loop.Remote, d.Loop.Local) + require.Equal(t, ld.Loop.Local, d.Loop.Remote) + + // TODO: This error is not checked due to a bug in dmsg. + err = proto.WritePacket(RespSuccess, nil) + _ = err + }) +} + +func createServer(t *testing.T, dc disc.APIClient) (srv *dmsg.Server, srvErr <-chan error) { + pk, sk, err := cipher.GenerateDeterministicKeyPair([]byte("s")) + require.NoError(t, err) + l, err := nettest.NewLocalListener("tcp") + require.NoError(t, err) + srv, err = dmsg.NewServer(pk, sk, "", l, dc) + require.NoError(t, err) + errCh := make(chan error, 1) + go func() { + errCh <- srv.Serve() + close(errCh) + }() + return srv, errCh +} + +func errWithTimeout(ch <-chan error) error { + select { + case err := <-ch: + return err + case <-time.After(5 * time.Second): + return errors.New("timeout") + } +} diff --git a/pkg/setup/protocol.go b/pkg/setup/protocol.go index 8167c27beb..8421406d97 100644 --- a/pkg/setup/protocol.go +++ b/pkg/setup/protocol.go @@ -34,7 +34,7 @@ func (sp PacketType) String() string { case RespFailure: return "Failure" case PacketRequestRouteID: - return "RequestRouteID" + return "RequestRouteIDs" } return fmt.Sprintf("Unknown(%d)", sp) } @@ -52,7 +52,7 @@ const ( PacketCloseLoop // PacketLoopClosed represents OnLoopClosed foundation packet. PacketLoopClosed - // PacketRequestRouteID represents RequestRouteID foundation packet. + // PacketRequestRouteID represents RequestRouteIDs foundation packet. PacketRequestRouteID // RespFailure represents failure response for a foundation packet. @@ -113,24 +113,24 @@ func (p *Protocol) Close() error { return nil } -// RequestRouteID sends RequestRouteID request. -func RequestRouteID(ctx context.Context, p *Protocol) (routing.RouteID, error) { - if err := p.WritePacket(PacketRequestRouteID, nil); err != nil { - return 0, err +// RequestRouteIDs sends RequestRouteIDs request. +func RequestRouteIDs(ctx context.Context, p *Protocol, n uint8) ([]routing.RouteID, error) { + if err := p.WritePacket(PacketRequestRouteID, n); err != nil { + return nil, err } var res []routing.RouteID if err := readAndDecodePacketWithTimeout(ctx, p, &res); err != nil { - return 0, err + return nil, err } - if len(res) == 0 { - return 0, errors.New("empty response") + if len(res) != int(n) { + return nil, errors.New("invalid response: wrong number of routeIDs") } - return res[0], nil + return res, nil } -// AddRule sends AddRule setup request. -func AddRule(ctx context.Context, p *Protocol, rule routing.Rule) error { - if err := p.WritePacket(PacketAddRules, []routing.Rule{rule}); err != nil { +// AddRules sends AddRule setup request. +func AddRules(ctx context.Context, p *Protocol, rules []routing.Rule) error { + if err := p.WritePacket(PacketAddRules, rules); err != nil { return err } return readAndDecodePacketWithTimeout(ctx, p, nil) @@ -197,6 +197,9 @@ func readAndDecodePacketWithTimeout(ctx context.Context, p *Protocol, v interfac case <-ctx.Done(): return ctx.Err() case <-done: + if err == io.ErrClosedPipe { + return nil + } return err } } diff --git a/pkg/visor/rpc_client.go b/pkg/visor/rpc_client.go index a2e0c8e951..c9464b55af 100644 --- a/pkg/visor/rpc_client.go +++ b/pkg/visor/rpc_client.go @@ -12,6 +12,7 @@ import ( "github.com/google/uuid" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" + "github.com/skycoin/skywire/pkg/app" "github.com/skycoin/skywire/pkg/router" "github.com/skycoin/skywire/pkg/routing" From 6439c70ee1ca10d2086c3721fe364800c27c00aa Mon Sep 17 00:00:00 2001 From: Evan Lin Date: Mon, 9 Sep 2019 02:50:28 +0800 Subject: [PATCH 3/8] Re-added initTransports for transport.Manager This was removed for some reason, but it needs to exist in order to reestablish transports on visor restart. --- pkg/transport/manager.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/transport/manager.go b/pkg/transport/manager.go index ccc246207a..3b2b1307eb 100644 --- a/pkg/transport/manager.go +++ b/pkg/transport/manager.go @@ -100,6 +100,8 @@ func (tm *Manager) serve(ctx context.Context) { } }() } + + tm.initTransports(ctx) tm.Logger.Info("transport manager is serving.") // closing logic @@ -116,6 +118,26 @@ func (tm *Manager) serve(ctx context.Context) { } } +func (tm *Manager) initTransports(ctx context.Context) { + tm.mx.Lock() + defer tm.mx.Unlock() + + entries, err := tm.conf.DiscoveryClient.GetTransportsByEdge(ctx, tm.conf.PubKey) + if err != nil { + log.Warnf("No transports found for local node: %v", err) + } + for _, entry := range entries { + var ( + tpType = entry.Entry.Type + remote = entry.Entry.RemoteEdge(tm.conf.PubKey) + tpID = entry.Entry.ID + ) + if _, err := tm.saveTransport(remote, tpType); err != nil { + tm.Logger.Warnf("INIT: failed to init tp: type(%s) remote(%s) tpID(%s)", tpType, remote, tpID) + } + } +} + func (tm *Manager) acceptTransport(ctx context.Context, lis *snet.Listener) error { conn, err := lis.AcceptConn() // TODO: tcp panic. if err != nil { From 8273b45cd04c3d862e170316d84c13ac106efc51 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Tue, 10 Sep 2019 09:44:19 +0200 Subject: [PATCH 4/8] changes to api --- pkg/route-finder/client/client.go | 49 ++++++++++++++----------------- pkg/route-finder/client/mock.go | 31 ++++++++++--------- pkg/router/router.go | 2 +- 3 files changed, 38 insertions(+), 44 deletions(-) diff --git a/pkg/route-finder/client/client.go b/pkg/route-finder/client/client.go index 6fa5b53357..c9d6f15648 100644 --- a/pkg/route-finder/client/client.go +++ b/pkg/route-finder/client/client.go @@ -21,18 +21,15 @@ const defaultContextTimeout = 10 * time.Second var log = logging.MustGetLogger("route-finder") -// GetRoutesRequest parses json body for /routes endpoint request -type GetRoutesRequest struct { - SrcPK cipher.PubKey `json:"src_pk,omitempty"` - DstPK cipher.PubKey `json:"dst_pk,omitempty"` - MinHops uint16 `json:"min_hops,omitempty"` - MaxHops uint16 `json:"max_hops,omitempty"` +type RouteOptions struct { + MinHops uint16 + MaxHops uint16 } -// GetRoutesResponse encodes the json body of /routes response -type GetRoutesResponse struct { - Forward []routing.Route `json:"forward"` - Reverse []routing.Route `json:"response"` +// GetRoutesRequest parses json body for /routes endpoint request +type FindRoutesRequest struct { + Edges [][2]cipher.PubKey + Opts *RouteOptions } // HTTPResponse represents http response struct @@ -49,7 +46,7 @@ type HTTPError struct { // Client implements route finding operations. type Client interface { - PairedRoutes(source, destiny cipher.PubKey, minHops, maxHops uint16) ([]routing.Route, []routing.Route, error) + FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) } // APIClient implements Client interface @@ -72,23 +69,21 @@ func NewHTTP(addr string, apiTimeout time.Duration) Client { } } -// PairedRoutes returns routes from source skywire visor to destiny, that has at least the given minHops and as much -// the given maxHops as well as the reverse routes from destiny to source. -func (c *apiClient) PairedRoutes(source, destiny cipher.PubKey, minHops, maxHops uint16) ([]routing.Route, []routing.Route, error) { - requestBody := &GetRoutesRequest{ - SrcPK: source, - DstPK: destiny, - MinHops: minHops, - MaxHops: maxHops, +// FindRoutes returns routes from source skywire visor to destiny, that has at least the given minHops and as much +// the given maxHops. +func (c *apiClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) { + requestBody := &FindRoutesRequest{ + Edges: rts, + Opts: opts, } marshaledBody, err := json.Marshal(requestBody) if err != nil { - return nil, nil, err + return nil, err } req, err := http.NewRequest(http.MethodGet, c.addr+"/routes", bytes.NewBuffer(marshaledBody)) if err != nil { - return nil, nil, err + return nil, err } req.Header.Set("Content-Type", "application/json") ctx, cancel := context.WithTimeout(context.Background(), c.apiTimeout) @@ -104,7 +99,7 @@ func (c *apiClient) PairedRoutes(source, destiny cipher.PubKey, minHops, maxHops }() } if err != nil { - return nil, nil, err + return nil, err } if res.StatusCode != http.StatusOK { @@ -112,19 +107,19 @@ func (c *apiClient) PairedRoutes(source, destiny cipher.PubKey, minHops, maxHops err = json.NewDecoder(res.Body).Decode(&apiErr) if err != nil { - return nil, nil, err + return nil, err } - return nil, nil, errors.New(apiErr.Error.Message) + return nil, errors.New(apiErr.Error.Message) } - var routes GetRoutesResponse + var routes [][]routing.Route err = json.NewDecoder(res.Body).Decode(&routes) if err != nil { - return nil, nil, err + return nil, err } - return routes.Forward, routes.Reverse, nil + return routes, nil } func sanitizedAddr(addr string) string { diff --git a/pkg/route-finder/client/mock.go b/pkg/route-finder/client/mock.go index 3184b2b91a..c590c8c050 100644 --- a/pkg/route-finder/client/mock.go +++ b/pkg/route-finder/client/mock.go @@ -1,6 +1,8 @@ package client import ( + "context" + "fmt" "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skywire/pkg/routing" @@ -23,27 +25,24 @@ func (r *mockClient) SetError(err error) { r.err = err } -// PairedRoutes implements Client for MockClient -func (r *mockClient) PairedRoutes(src, dst cipher.PubKey, minHops, maxHops uint16) ([]routing.Route, []routing.Route, error) { +// FindRoutes implements Client for MockClient +func (r *mockClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) { if r.err != nil { - return nil, nil, r.err + return nil, r.err } - return []routing.Route{ - { - &routing.Hop{ - From: src, - To: dst, - Transport: transport.MakeTransportID(src, dst, ""), - }, - }, - }, []routing.Route{ + if len(rts) == 0 { + return nil, fmt.Errorf("no edges provided to returns routes from") + } + return [][]routing.Route{ + { { &routing.Hop{ - From: src, - To: dst, - Transport: transport.MakeTransportID(src, dst, ""), + From: rts[0][0], + To: rts[0][1], + Transport: transport.MakeTransportID(rts[0][0], rts[0][1], ""), }, }, - }, nil + }, + }, nil } diff --git a/pkg/router/router.go b/pkg/router/router.go index 9b49339fe2..3e8fe7b481 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -416,7 +416,7 @@ func (r *Router) fetchBestRoutes(source, destination cipher.PubKey) (fwd routing defer timer.Stop() fetchRoutesAgain: - fwdRoutes, revRoutes, err := r.conf.RouteFinder.PairedRoutes(source, destination, minHops, maxHops) + fwdRoutes, err := r.conf.RouteFinder.FindRoutes(source, destination, minHops, maxHops) if err != nil { select { case <-timer.C: From d4887e1993d95962c193389477fa848e7b3e6dff Mon Sep 17 00:00:00 2001 From: ivcosla Date: Tue, 10 Sep 2019 19:42:38 +0200 Subject: [PATCH 5/8] changed to use new route finder api --- cmd/skywire-cli/commands/rtfind/root.go | 9 ++++++--- pkg/route-finder/client/client.go | 3 ++- pkg/router/router.go | 8 +++++--- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cmd/skywire-cli/commands/rtfind/root.go b/cmd/skywire-cli/commands/rtfind/root.go index 719b76ef92..52959000bf 100644 --- a/cmd/skywire-cli/commands/rtfind/root.go +++ b/cmd/skywire-cli/commands/rtfind/root.go @@ -1,6 +1,7 @@ package rtfind import ( + "context" "fmt" "time" @@ -34,10 +35,12 @@ var RootCmd = &cobra.Command{ internal.Catch(srcPK.Set(args[0])) internal.Catch(dstPK.Set(args[1])) - forward, reverse, err := rfc.PairedRoutes(srcPK, dstPK, frMinHops, frMaxHops) + ctx := context.Background() + routes, err := rfc.FindRoutes(ctx, [][2]cipher.PubKey{{srcPK, dstPK}, {dstPK, srcPK}}, + &client.RouteOptions{MinHops: frMinHops, MaxHops: frMaxHops}) internal.Catch(err) - fmt.Println("forward: ", forward) - fmt.Println("reverse: ", reverse) + fmt.Println("forward: ", routes[0][0]) + fmt.Println("reverse: ", routes[1][0]) }, } diff --git a/pkg/route-finder/client/client.go b/pkg/route-finder/client/client.go index c9d6f15648..6aa6e7b14d 100644 --- a/pkg/route-finder/client/client.go +++ b/pkg/route-finder/client/client.go @@ -21,12 +21,13 @@ const defaultContextTimeout = 10 * time.Second var log = logging.MustGetLogger("route-finder") +// RouteOptions for FindRoutesRequest. If nil MinHops and MaxHops will take default values type RouteOptions struct { MinHops uint16 MaxHops uint16 } -// GetRoutesRequest parses json body for /routes endpoint request +// FindRoutesRequest parses json body for /routes endpoint request type FindRoutesRequest struct { Edges [][2]cipher.PubKey Opts *RouteOptions diff --git a/pkg/router/router.go b/pkg/router/router.go index 3e8fe7b481..f4928a5972 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -416,7 +416,9 @@ func (r *Router) fetchBestRoutes(source, destination cipher.PubKey) (fwd routing defer timer.Stop() fetchRoutesAgain: - fwdRoutes, err := r.conf.RouteFinder.FindRoutes(source, destination, minHops, maxHops) + ctx := context.Background() + routes, err := r.conf.RouteFinder.FindRoutes(ctx, [][2]cipher.PubKey{{source, destination}, {destination, source}}, + &routeFinder.RouteOptions{MinHops: minHops, MaxHops: maxHops}) if err != nil { select { case <-timer.C: @@ -426,8 +428,8 @@ fetchRoutesAgain: } } - r.Logger.Infof("Found routes Forward: %s. Reverse %s", fwdRoutes, revRoutes) - return fwdRoutes[0], revRoutes[0], nil + r.Logger.Infof("Found routes Forward: %s. Reverse %s", routes[0], routes[1]) + return routes[0][0], routes[1][0], nil } // SetupIsTrusted checks if setup node is trusted. From 7767da3261aee7e0f6c75697c04b39d2c37c0858 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Wed, 11 Sep 2019 10:29:23 +0200 Subject: [PATCH 6/8] using context --- pkg/route-finder/client/client.go | 2 +- pkg/route-finder/client/mock.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/route-finder/client/client.go b/pkg/route-finder/client/client.go index 6aa6e7b14d..24857e8cfa 100644 --- a/pkg/route-finder/client/client.go +++ b/pkg/route-finder/client/client.go @@ -87,7 +87,7 @@ func (c *apiClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts return nil, err } req.Header.Set("Content-Type", "application/json") - ctx, cancel := context.WithTimeout(context.Background(), c.apiTimeout) + ctx, cancel := context.WithTimeout(ctx, c.apiTimeout) defer cancel() req = req.WithContext(ctx) diff --git a/pkg/route-finder/client/mock.go b/pkg/route-finder/client/mock.go index c590c8c050..03f28c5769 100644 --- a/pkg/route-finder/client/mock.go +++ b/pkg/route-finder/client/mock.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skywire/pkg/routing" From 9e890f47c9cf24c1489b34d623d5e72a91417aea Mon Sep 17 00:00:00 2001 From: ivcosla Date: Fri, 13 Sep 2019 18:58:24 +0200 Subject: [PATCH 7/8] changed findRoutes return type --- cmd/skywire-cli/commands/rtfind/root.go | 10 ++++---- go.sum | 1 + pkg/route-finder/client/client.go | 13 +++++------ pkg/route-finder/client/mock.go | 8 +++---- pkg/router/router.go | 10 ++++---- pkg/routing/loop.go | 4 ++-- pkg/routing/packet_test.go | 22 ++++++++++++++++++ pkg/routing/route.go | 31 +++++++++++++++++++++++++ pkg/setup/idreservoir.go | 4 ++-- pkg/setup/node.go | 2 +- 10 files changed, 81 insertions(+), 24 deletions(-) diff --git a/cmd/skywire-cli/commands/rtfind/root.go b/cmd/skywire-cli/commands/rtfind/root.go index 52959000bf..4395c09ac6 100644 --- a/cmd/skywire-cli/commands/rtfind/root.go +++ b/cmd/skywire-cli/commands/rtfind/root.go @@ -10,6 +10,7 @@ import ( "github.com/skycoin/skywire/cmd/skywire-cli/internal" "github.com/skycoin/skywire/pkg/route-finder/client" + "github.com/skycoin/skywire/pkg/routing" ) var frAddr string @@ -34,13 +35,14 @@ var RootCmd = &cobra.Command{ var srcPK, dstPK cipher.PubKey internal.Catch(srcPK.Set(args[0])) internal.Catch(dstPK.Set(args[1])) - + forward := [2]cipher.PubKey{srcPK, dstPK} + backward := [2]cipher.PubKey{dstPK, srcPK} ctx := context.Background() - routes, err := rfc.FindRoutes(ctx, [][2]cipher.PubKey{{srcPK, dstPK}, {dstPK, srcPK}}, + routes, err := rfc.FindRoutes(ctx, []routing.PathEdges{forward, backward}, &client.RouteOptions{MinHops: frMinHops, MaxHops: frMaxHops}) internal.Catch(err) - fmt.Println("forward: ", routes[0][0]) - fmt.Println("reverse: ", routes[1][0]) + fmt.Println("forward: ", routes[forward][0]) + fmt.Println("reverse: ", routes[backward][0]) }, } diff --git a/go.sum b/go.sum index 60f365b041..2beaa995ca 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,7 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/route-finder/client/client.go b/pkg/route-finder/client/client.go index 24857e8cfa..72e5f0c92e 100644 --- a/pkg/route-finder/client/client.go +++ b/pkg/route-finder/client/client.go @@ -11,7 +11,6 @@ import ( "strings" "time" - "github.com/skycoin/dmsg/cipher" "github.com/skycoin/skycoin/src/util/logging" "github.com/skycoin/skywire/pkg/routing" @@ -29,7 +28,7 @@ type RouteOptions struct { // FindRoutesRequest parses json body for /routes endpoint request type FindRoutesRequest struct { - Edges [][2]cipher.PubKey + Edges []routing.PathEdges Opts *RouteOptions } @@ -47,7 +46,7 @@ type HTTPError struct { // Client implements route finding operations. type Client interface { - FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) + FindRoutes(ctx context.Context, rts []routing.PathEdges, opts *RouteOptions) (map[routing.PathEdges][]routing.Path, error) } // APIClient implements Client interface @@ -72,7 +71,7 @@ func NewHTTP(addr string, apiTimeout time.Duration) Client { // FindRoutes returns routes from source skywire visor to destiny, that has at least the given minHops and as much // the given maxHops. -func (c *apiClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) { +func (c *apiClient) FindRoutes(ctx context.Context, rts []routing.PathEdges, opts *RouteOptions) (map[routing.PathEdges][]routing.Path, error) { requestBody := &FindRoutesRequest{ Edges: rts, Opts: opts, @@ -114,13 +113,13 @@ func (c *apiClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts return nil, errors.New(apiErr.Error.Message) } - var routes [][]routing.Route - err = json.NewDecoder(res.Body).Decode(&routes) + var paths map[routing.PathEdges][]routing.Path + err = json.NewDecoder(res.Body).Decode(&paths) if err != nil { return nil, err } - return routes, nil + return paths, nil } func sanitizedAddr(addr string) string { diff --git a/pkg/route-finder/client/mock.go b/pkg/route-finder/client/mock.go index 03f28c5769..359802a91b 100644 --- a/pkg/route-finder/client/mock.go +++ b/pkg/route-finder/client/mock.go @@ -27,7 +27,7 @@ func (r *mockClient) SetError(err error) { } // FindRoutes implements Client for MockClient -func (r *mockClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opts *RouteOptions) ([][]routing.Route, error) { +func (r *mockClient) FindRoutes(ctx context.Context, rts []routing.PathEdges, opts *RouteOptions) (map[routing.PathEdges][]routing.Path, error) { if r.err != nil { return nil, r.err } @@ -35,10 +35,10 @@ func (r *mockClient) FindRoutes(ctx context.Context, rts [][2]cipher.PubKey, opt if len(rts) == 0 { return nil, fmt.Errorf("no edges provided to returns routes from") } - return [][]routing.Route{ - { + return map[routing.PathEdges][]routing.Path{ + [2]cipher.PubKey{rts[0][0], rts[0][1]}: { { - &routing.Hop{ + routing.Hop{ From: rts[0][0], To: rts[0][1], Transport: transport.MakeTransportID(rts[0][0], rts[0][1], ""), diff --git a/pkg/router/router.go b/pkg/router/router.go index f4928a5972..93865e066f 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -409,15 +409,17 @@ func (r *Router) destroyLoop(loop routing.Loop) error { return r.rm.RemoveLoopRule(loop) } -func (r *Router) fetchBestRoutes(source, destination cipher.PubKey) (fwd routing.Route, rev routing.Route, err error) { +func (r *Router) fetchBestRoutes(source, destination cipher.PubKey) (fwd routing.Path, rev routing.Path, err error) { r.Logger.Infof("Requesting new routes from %s to %s", source, destination) timer := time.NewTimer(time.Second * 10) defer timer.Stop() + forward := [2]cipher.PubKey{source, destination} + backward := [2]cipher.PubKey{destination, source} fetchRoutesAgain: ctx := context.Background() - routes, err := r.conf.RouteFinder.FindRoutes(ctx, [][2]cipher.PubKey{{source, destination}, {destination, source}}, + paths, err := r.conf.RouteFinder.FindRoutes(ctx, []routing.PathEdges{forward, backward}, &routeFinder.RouteOptions{MinHops: minHops, MaxHops: maxHops}) if err != nil { select { @@ -428,8 +430,8 @@ fetchRoutesAgain: } } - r.Logger.Infof("Found routes Forward: %s. Reverse %s", routes[0], routes[1]) - return routes[0][0], routes[1][0], nil + r.Logger.Infof("Found routes Forward: %s. Reverse %s", paths[forward], paths[backward]) + return paths[forward][0], paths[backward][0], nil } // SetupIsTrusted checks if setup node is trusted. diff --git a/pkg/routing/loop.go b/pkg/routing/loop.go index 3206fd8aaa..0be65091a4 100644 --- a/pkg/routing/loop.go +++ b/pkg/routing/loop.go @@ -21,8 +21,8 @@ func (l Loop) String() string { // LoopDescriptor defines a loop over a pair of routes. type LoopDescriptor struct { Loop Loop - Forward Route - Reverse Route + Forward Path + Reverse Path KeepAlive time.Duration } diff --git a/pkg/routing/packet_test.go b/pkg/routing/packet_test.go index 2558da29cd..3e0cbccda4 100644 --- a/pkg/routing/packet_test.go +++ b/pkg/routing/packet_test.go @@ -1,9 +1,12 @@ package routing import ( + "encoding/json" "testing" + "github.com/skycoin/dmsg/cipher" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestMakePacket(t *testing.T) { @@ -18,3 +21,22 @@ func TestMakePacket(t *testing.T) { assert.Equal(t, RouteID(2), packet.RouteID()) assert.Equal(t, []byte("foo"), packet.Payload()) } + +func TestEncoding(t *testing.T) { + pka, _ := cipher.GenerateKeyPair() + pkb, _ := cipher.GenerateKeyPair() + + edges1 := PathEdges{pka, pkb} + edges2 := PathEdges{pkb, pka} + + m := map[PathEdges]string{edges1: "a", edges2: "b"} + + b, err := json.Marshal(m) + require.NoError(t, err) + + m2 := make(map[PathEdges]string) + + err = json.Unmarshal(b, &m2) + require.NoError(t, err) + assert.Equal(t, m, m2) +} diff --git a/pkg/routing/route.go b/pkg/routing/route.go index 86a962748c..8cb987183f 100644 --- a/pkg/routing/route.go +++ b/pkg/routing/route.go @@ -3,6 +3,7 @@ package routing import ( + "encoding/json" "fmt" "github.com/google/uuid" @@ -20,6 +21,36 @@ func (h Hop) String() string { return fmt.Sprintf("%s -> %s @ %s", h.From, h.To, h.Transport) } +// PathEdges are the edge nodes of a path +type PathEdges [2]cipher.PubKey + +// PathEdgesText is used internally for marshaling and unmarshaling of PathEdges +type PathEdgesText struct { + Edge1 cipher.PubKey `json:"edge_1"` + Edge2 cipher.PubKey `json:"edge_2"` +} + +// MarshalText implements encoding.TextMarshaler +func (p PathEdges) MarshalText() ([]byte, error) { + return json.Marshal(PathEdgesText{p[0], p[1]}) +} + +// UnmarshalText implements json.Unmarshaler +func (p *PathEdges) UnmarshalText(b []byte) error { + edges := PathEdgesText{} + err := json.Unmarshal(b, &edges) + if err != nil { + return err + } + + p[0] = edges.Edge1 + p[1] = edges.Edge2 + return nil +} + +// Path is a list of hops between nodes (transports), and indicates a route between the edges +type Path []Hop + // Route is a succession of transport entries that denotes a path from source node to destination node type Route []*Hop diff --git a/pkg/setup/idreservoir.go b/pkg/setup/idreservoir.go index 467192c6cd..8fff74bc28 100644 --- a/pkg/setup/idreservoir.go +++ b/pkg/setup/idreservoir.go @@ -19,7 +19,7 @@ type idReservoir struct { mx sync.Mutex } -func newIDReservoir(routes ...routing.Route) (*idReservoir, int) { +func newIDReservoir(routes ...routing.Path) (*idReservoir, int) { rec := make(map[cipher.PubKey]uint8) var total int @@ -130,7 +130,7 @@ func GenerateRules(idc *idReservoir, ld routing.LoopDescriptor) (rules RulesMap, // - firstRID: the first visor's route ID. // - lastRID: the last visor's route ID (note that there is no rule set for this ID yet). // - err: an error (if any). -func SaveForwardRules(rules RulesMap, idc *idReservoir, keepAlive time.Duration, route routing.Route) (firstRID, lastRID routing.RouteID, err error) { +func SaveForwardRules(rules RulesMap, idc *idReservoir, keepAlive time.Duration, route routing.Path) (firstRID, lastRID routing.RouteID, err error) { // 'firstRID' is the first visor's key routeID - this is to be returned. var ok bool diff --git a/pkg/setup/node.go b/pkg/setup/node.go index c522a73297..eef2a9f523 100644 --- a/pkg/setup/node.go +++ b/pkg/setup/node.go @@ -224,7 +224,7 @@ func (sn *Node) handleCreateLoop(ctx context.Context, ld routing.LoopDescriptor) return nil } -func (sn *Node) reserveRouteIDs(ctx context.Context, fwd, rev routing.Route) (*idReservoir, error) { +func (sn *Node) reserveRouteIDs(ctx context.Context, fwd, rev routing.Path) (*idReservoir, error) { idc, total := newIDReservoir(fwd, rev) sn.Logger.Infof("There are %d route IDs to reserve.", total) From 33b11bb5849c192a3585a5aed40a494bf3ae3dc5 Mon Sep 17 00:00:00 2001 From: ivcosla Date: Sat, 14 Sep 2019 16:46:11 +0200 Subject: [PATCH 8/8] changed PathEdges marshaltext --- pkg/routing/route.go | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pkg/routing/route.go b/pkg/routing/route.go index 8cb987183f..021b56a5d3 100644 --- a/pkg/routing/route.go +++ b/pkg/routing/route.go @@ -3,7 +3,7 @@ package routing import ( - "encoding/json" + "bytes" "fmt" "github.com/google/uuid" @@ -24,27 +24,32 @@ func (h Hop) String() string { // PathEdges are the edge nodes of a path type PathEdges [2]cipher.PubKey -// PathEdgesText is used internally for marshaling and unmarshaling of PathEdges -type PathEdgesText struct { - Edge1 cipher.PubKey `json:"edge_1"` - Edge2 cipher.PubKey `json:"edge_2"` -} - // MarshalText implements encoding.TextMarshaler func (p PathEdges) MarshalText() ([]byte, error) { - return json.Marshal(PathEdgesText{p[0], p[1]}) + b1, err := p[0].MarshalText() + if err != nil { + return nil, err + } + b2, err := p[1].MarshalText() + if err != nil { + return nil, err + } + res := bytes.NewBuffer(b1) + res.WriteString(":") // nolint + res.Write(b2) // nolint + return res.Bytes(), nil } // UnmarshalText implements json.Unmarshaler func (p *PathEdges) UnmarshalText(b []byte) error { - edges := PathEdgesText{} - err := json.Unmarshal(b, &edges) + err := p[0].UnmarshalText(b[:66]) + if err != nil { + return err + } + err = p[1].UnmarshalText(b[67:]) if err != nil { return err } - - p[0] = edges.Edge1 - p[1] = edges.Edge2 return nil }