Skip to content

Commit

Permalink
Merge branch 'mainnet-milestone1' into bug/fix-tests-uncomment
Browse files Browse the repository at this point in the history
# Conflicts:
#	cmd/skywire-cli/commands/node/routes.go
#	pkg/router/route_manager_test.go
#	pkg/routing/rule.go
#	pkg/routing/rule_test.go
#	pkg/setup/node.go
#	pkg/setup/node_test.go
#	pkg/transport/mock.go
#	pkg/visor/rpc_client.go
  • Loading branch information
nkryuchkov committed Sep 4, 2019
2 parents 253e89b + 2a81b33 commit d02bad5
Show file tree
Hide file tree
Showing 27 changed files with 293 additions and 161 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rerun: stop
lint: ## Run linters. Use make install-linters first
${OPTS} golangci-lint run -c .golangci.yml ./...
# The govet version in golangci-lint is out of date and has spurious warnings, run it separately
${OPTS} go vet -mod=vendor -all ./...
${OPTS} go vet -all ./...

vendorcheck: ## Run vendorcheck
GO111MODULE=off vendorcheck ./internal/...
Expand Down
12 changes: 6 additions & 6 deletions cmd/skywire-cli/commands/node/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ var rmRuleCmd = &cobra.Command{
},
}

var expire time.Duration
var keepAlive time.Duration

func init() {
addRuleCmd.PersistentFlags().DurationVar(&expire, "expire", router.RouteTTL, "duration after which routing rule will expire")
addRuleCmd.PersistentFlags().DurationVar(&keepAlive, "keep-alive", router.DefaultRouteKeepAlive, "duration after which routing rule will expire if no activity is present")
}

var addRuleCmd = &cobra.Command{
Expand Down Expand Up @@ -100,13 +100,13 @@ var addRuleCmd = &cobra.Command{
remotePort = routing.Port(parseUint("remote-port", args[3], 16))
localPort = routing.Port(parseUint("local-port", args[4], 16))
)
rule = routing.AppRule(time.Now().Add(expire), 0, routeID, remotePK, localPort, remotePort)
rule = routing.AppRule(keepAlive, 0, routeID, remotePK, localPort, remotePort)
case "fwd":
var (
nextRouteID = routing.RouteID(parseUint("next-route-id", args[1], 32))
nextTpID = internal.ParseUUID("next-transport-id", args[2])
)
rule = routing.ForwardRule(time.Now().Add(expire), nextRouteID, nextTpID, 0)
rule = routing.ForwardRule(keepAlive, nextRouteID, nextTpID, 0)
}
rIDKey, err := rpcClient().AddRoutingRule(rule)
internal.Catch(err)
Expand All @@ -117,12 +117,12 @@ var addRuleCmd = &cobra.Command{
func printRoutingRules(rules ...*visor.RoutingEntry) {
printAppRule := func(w io.Writer, id routing.RouteID, s *routing.RuleSummary) {
_, err := fmt.Fprintf(w, "%d\t%s\t%d\t%d\t%s\t%d\t%s\t%s\t%s\n", id, s.Type, s.AppFields.LocalPort,
s.AppFields.RemotePort, s.AppFields.RemotePK, s.AppFields.RespRID, "-", "-", s.ExpireAt)
s.AppFields.RemotePort, s.AppFields.RemotePK, s.AppFields.RespRID, "-", "-", s.KeepAlive)
internal.Catch(err)
}
printFwdRule := func(w io.Writer, id routing.RouteID, s *routing.RuleSummary) {
_, err := fmt.Fprintf(w, "%d\t%s\t%s\t%s\t%s\t%s\t%d\t%s\t%s\n", id, s.Type, "-",
"-", "-", "-", s.ForwardFields.NextRID, s.ForwardFields.NextTID, s.ExpireAt)
"-", "-", "-", s.ForwardFields.NextRID, s.ForwardFields.NextTID, s.KeepAlive)
internal.Catch(err)
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 5, ' ', tabwriter.TabIndent)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ require (
)

// Uncomment for tests with alternate branches of 'dmsg'
//replace github.com/skycoin/dmsg => ../dmsg
replace github.com/skycoin/dmsg => ../dmsg
11 changes: 10 additions & 1 deletion internal/utclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ import (
"net/http"

"github.com/skycoin/dmsg/cipher"
"github.com/skycoin/skycoin/src/util/logging"

"github.com/skycoin/skywire/internal/httpauth"
)

var log = logging.MustGetLogger("utclient")

// Error is the object returned to the client when there's an error.
type Error struct {
Error string `json:"error"`
Expand Down Expand Up @@ -61,10 +64,16 @@ func (c *httpClient) Get(ctx context.Context, path string) (*http.Response, erro
// UpdateNodeUptime updates node uptime.
func (c *httpClient) UpdateNodeUptime(ctx context.Context) error {
resp, err := c.Get(ctx, "/update")
if resp != nil {
defer func() {
if err := resp.Body.Close(); err != nil {
log.WithError(err).Warn("Failed to close response body")
}
}()
}
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("status: %d, error: %v", resp.StatusCode, extractError(resp.Body))
Expand Down
8 changes: 6 additions & 2 deletions internal/utclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func TestClientAuth(t *testing.T) {
headerCh <- r.Header

case fmt.Sprintf("/security/nonces/%s", testPubKey):
fmt.Fprintf(w, `{"edge": "%s", "next_nonce": 1}`, testPubKey)
if _, err := fmt.Fprintf(w, `{"edge": "%s", "next_nonce": 1}`, testPubKey); err != nil {
t.Errorf("Failed to write nonce response: %s", err)
}

default:
t.Errorf("Don't know how to handle URL = '%s'", url)
Expand Down Expand Up @@ -75,7 +77,9 @@ func authHandler(next http.Handler) http.Handler {
m := http.NewServeMux()
m.Handle("/security/nonces/", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: testPubKey, NextNonce: 1}) // nolint: errcheck
if err := json.NewEncoder(w).Encode(&httpauth.NextNonceResponse{Edge: testPubKey, NextNonce: 1}); err != nil {
log.WithError(err).Error("Failed to encode nonce response")
}
},
))
m.Handle("/", next)
Expand Down
69 changes: 59 additions & 10 deletions pkg/router/managed_routing_table.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package router

import (
"errors"
"sync"
"time"

"github.com/skycoin/skywire/pkg/routing"
)

var routeKeepalive = 10 * time.Minute // interval to keep active expired routes
var (
// ErrRuleTimedOut is being returned while trying to access the rule which timed out
ErrRuleTimedOut = errors.New("rule keep-alive timeout exceeded")
)

type managedRoutingTable struct {
routing.Table
Expand All @@ -23,29 +27,74 @@ func manageRoutingTable(rt routing.Table) *managedRoutingTable {
}
}

func (rt *managedRoutingTable) AddRule(rule routing.Rule) (routing.RouteID, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

routeID, err := rt.Table.AddRule(rule)
if err != nil {
return 0, err
}

// set the initial activity for rule not to be timed out instantly
rt.activity[routeID] = time.Now()

return routeID, nil
}

func (rt *managedRoutingTable) Rule(routeID routing.RouteID) (routing.Rule, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

rule, err := rt.Table.Rule(routeID)
if err != nil {
return nil, err
}

if rt.ruleIsTimedOut(routeID, rule) {
return nil, ErrRuleTimedOut
}

rt.activity[routeID] = time.Now()
rt.mu.Unlock()
return rt.Table.Rule(routeID)

return rule, nil
}

func (rt *managedRoutingTable) Cleanup() error {
expiredIDs := make([]routing.RouteID, 0)
rt.mu.Lock()
defer rt.mu.Unlock()

err := rt.RangeRules(func(routeID routing.RouteID, rule routing.Rule) bool {
if rule.Expiry().Before(time.Now()) {
if lastActivity, ok := rt.activity[routeID]; !ok || time.Since(lastActivity) > routeKeepalive {
expiredIDs = append(expiredIDs, routeID)
}
if rt.ruleIsTimedOut(routeID, rule) {
expiredIDs = append(expiredIDs, routeID)
}
return true
})
rt.mu.Unlock()

if err != nil {
return err
}

return rt.DeleteRules(expiredIDs...)
if err := rt.DeleteRules(expiredIDs...); err != nil {
return err
}

rt.deleteActivity(expiredIDs...)

return nil
}

// ruleIsExpired checks whether rule's keep alive timeout is exceeded.
// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside
func (rt *managedRoutingTable) ruleIsTimedOut(routeID routing.RouteID, rule routing.Rule) bool {
lastActivity, ok := rt.activity[routeID]
return !ok || time.Since(lastActivity) > rule.KeepAlive()
}

// deleteActivity removes activity records for the specified set of `routeIDs`.
// NOTE: for internal use, is NOT thread-safe, object lock should be acquired outside
func (rt *managedRoutingTable) deleteActivity(routeIDs ...routing.RouteID) {
for _, rID := range routeIDs {
delete(rt.activity, rID)
}
}
10 changes: 7 additions & 3 deletions pkg/router/managed_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ import (
func TestManagedRoutingTableCleanup(t *testing.T) {
rt := manageRoutingTable(routing.InMemoryRoutingTable())

_, err := rt.AddRule(routing.ForwardRule(time.Now().Add(time.Hour), 3, uuid.New(), 1))
_, err := rt.AddRule(routing.ForwardRule(1*time.Hour, 3, uuid.New(), 1))
require.NoError(t, err)

id, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 2))
id, err := rt.AddRule(routing.ForwardRule(1*time.Hour, 3, uuid.New(), 2))
require.NoError(t, err)

id2, err := rt.AddRule(routing.ForwardRule(time.Now().Add(-time.Hour), 3, uuid.New(), 3))
id2, err := rt.AddRule(routing.ForwardRule(-1*time.Hour, 3, uuid.New(), 3))
require.NoError(t, err)

// rule should already be expired at this point due to the execution time.
// However, we'll just a bit to be sure
time.Sleep(1 * time.Millisecond)

assert.Equal(t, 3, rt.Count())

_, err = rt.Rule(id)
Expand Down
25 changes: 16 additions & 9 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/skycoin/skywire/pkg/snet"
)

// RMConfig represents route manager configuration.
type RMConfig struct {
SetupPKs []cipher.PubKey // Trusted setup PKs.
GarbageCollectDuration time.Duration
OnConfirmLoop func(loop routing.Loop, rule routing.Rule) (err error)
OnLoopClosed func(loop routing.Loop) error
}

// SetupIsTrusted checks if setup node is trusted.
func (sc RMConfig) SetupIsTrusted(sPK cipher.PubKey) bool {
for _, pk := range sc.SetupPKs {
if sPK == pk {
Expand All @@ -33,6 +35,7 @@ func (sc RMConfig) SetupIsTrusted(sPK cipher.PubKey) bool {
return false
}

// routeManager represents route manager.
type routeManager struct {
Logger *logging.Logger
conf RMConfig
Expand All @@ -42,8 +45,8 @@ type routeManager struct {
done chan struct{}
}

// NewRouteManager creates a new route manager.
func NewRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*routeManager, error) {
// newRouteManager creates a new route manager.
func newRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*routeManager, error) {
sl, err := n.Listen(snet.DmsgType, snet.AwaitSetupPort)
if err != nil {
return nil, err
Expand All @@ -58,11 +61,13 @@ func NewRouteManager(n *snet.Network, rt routing.Table, config RMConfig) (*route
}, nil
}

// Close closes route manager.
func (rm *routeManager) Close() error {
close(rm.done)
return rm.sl.Close()
}

// Serve initiates serving connections by route manager.
func (rm *routeManager) Serve() {
// Routing table garbage collect loop.
go rm.rtGarbageCollectLoop()
Expand Down Expand Up @@ -97,7 +102,11 @@ func (rm *routeManager) serveConn() error {
}

func (rm *routeManager) handleSetupConn(conn net.Conn) error {
defer func() { _ = conn.Close() }() //nolint:errcheck
defer func() {
if err := conn.Close(); err != nil {
log.WithError(err).Warn("Failed to close connection")
}
}()

proto := setup.NewSetupProtocol(conn)
t, body, err := proto.ReadPacket()
Expand Down Expand Up @@ -148,7 +157,7 @@ func (rm *routeManager) rtGarbageCollectLoop() {
}
}

func (rm *routeManager) dialSetupConn(ctx context.Context) (*snet.Conn, error) {
func (rm *routeManager) dialSetupConn(_ context.Context) (*snet.Conn, error) {
for _, sPK := range rm.conf.SetupPKs {
conn, err := rm.n.Dial(snet.DmsgType, sPK, snet.SetupPort)
if err != nil {
Expand All @@ -160,6 +169,7 @@ func (rm *routeManager) dialSetupConn(ctx context.Context) (*snet.Conn, error) {
return nil, errors.New("failed to dial to a setup node")
}

// GetRule gets routing rule.
func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) {
rule, err := rm.rt.Rule(routeID)
if err != nil {
Expand All @@ -176,13 +186,10 @@ func (rm *routeManager) GetRule(routeID routing.RouteID) (routing.Rule, error) {
return nil, errors.New("corrupted rule")
}

if rule.Expiry().Before(time.Now()) {
return nil, errors.New("expired routing rule")
}

return rule, nil
}

// RemoveLoopRule removes loop rule.
func (rm *routeManager) RemoveLoopRule(loop routing.Loop) error {
var appRouteID routing.RouteID
var appRule routing.Rule
Expand Down Expand Up @@ -306,7 +313,7 @@ func (rm *routeManager) loopClosed(data []byte) error {
}

func (rm *routeManager) occupyRouteID() ([]routing.RouteID, error) {
rule := routing.ForwardRule(time.Now().Add(RouteTTL), 0, uuid.UUID{}, 0)
rule := routing.ForwardRule(DefaultRouteKeepAlive, 0, uuid.UUID{}, 0)
routeID, err := rm.rt.AddRule(rule)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit d02bad5

Please sign in to comment.