Skip to content

Commit

Permalink
Merge pull request #523 from evanlinjin/mainnet-milestone1
Browse files Browse the repository at this point in the history
Fix router tests.
  • Loading branch information
志宇 authored Aug 21, 2019
2 parents d4cd530 + 22b0f44 commit 9ad1927
Show file tree
Hide file tree
Showing 49 changed files with 505 additions and 1,209 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ require (
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f
github.com/skycoin/dmsg v0.0.0-20190816104216-d18ee6aa05cb
github.com/skycoin/skycoin v0.26.0
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.3.0
go.etcd.io/bbolt v1.3.3
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect
golang.org/x/tools v0.0.0-20190805222050-c5a2fd39b72a // indirect
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a // indirect
golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 // indirect
)

// Uncomment for tests with alternate branches of 'dmsg'
replace github.com/skycoin/dmsg => ../dmsg
//replace github.com/skycoin/dmsg => ../dmsg
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f h1:WWjaxOXoj6oYelm67MNtJbg51HQALjKAyhs2WAHgpZs=
github.com/skycoin/dmsg v0.0.0-20190805065636-70f4c32a994f/go.mod h1:obZYZp8eKR7Xqz+KNhJdUE6Gvp6rEXbDO8YTlW2YXgU=
github.com/skycoin/dmsg v0.0.0-20190816104216-d18ee6aa05cb h1:kpNxP3mOjrVyyLBOtOxBgpxUOCBBI/RhdO9Vto5+OHk=
github.com/skycoin/dmsg v0.0.0-20190816104216-d18ee6aa05cb/go.mod h1:obZYZp8eKR7Xqz+KNhJdUE6Gvp6rEXbDO8YTlW2YXgU=
github.com/skycoin/skycoin v0.26.0 h1:xDxe2r8AclMntZ550Y/vUQgwgLtwrf9Wu5UYiYcN5/o=
github.com/skycoin/skycoin v0.26.0/go.mod h1:78nHjQzd8KG0jJJVL/j0xMmrihXi70ti63fh8vXScJw=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
Expand All @@ -116,12 +118,16 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586 h1:7KByu05hhLed2MO29w7p1XfZvZ13m8mub3shuVftRs0=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 h1:Ao/3l156eZf2AW5wK8a7/smtodRU+gha3+BeqJ69lRk=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7 h1:fHDIZ2oxGnUZRN6WgWFCbYBjH9uqVPRCUVUDhs0wnbA=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -135,13 +141,20 @@ golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190627182818-9947fec5c3ab/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190802220118-1d1727260058/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190805222050-c5a2fd39b72a h1:0AGI+cC4FJwXNdClvHzfHhJf/yPjKwdo/+m0lPKrdJA=
golang.org/x/tools v0.0.0-20190805222050-c5a2fd39b72a/go.mod h1:jcCCGcm9btYwXyDqrUWc6MKQKKGJCWEQ3AfLSRIbEuI=
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479 h1:lfN2PY/jymfnxkNHlbBF5DwPsUvhqUnrdgfK01iH2s0=
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7 h1:PVCvyir09Xgta5zksNZDkrL+eSm/Y+gQxRG3IfqNQ3A=
golang.org/x/tools v0.0.0-20190821162956-65e3620a7ae7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
4 changes: 2 additions & 2 deletions internal/testhelpers/testhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

const timeout = 5 * time.Second

// NoErrorWithinTimeout tries to read an error from error channel within timeout and returns it.
// WithinTimeout tries to read an error from error channel within timeout and returns it.
// If timeout exceeds, nil value is returned.
func NoErrorWithinTimeout(ch <-chan error) error {
func WithinTimeout(ch <-chan error) error {
select {
case err := <-ch:
return err
Expand Down
23 changes: 23 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ func SetupFromPipe(config *Config, inFD, outFD uintptr) (*App, error) {
return app, nil
}

// New creates a new App directly from a `net.Conn` implementation.
func New(conn net.Conn, conf *Config) (*App, error) {
app := &App{
config: *conf,
proto: NewProtocol(conn),
acceptChan: make(chan [2]routing.Addr),
doneChan: make(chan struct{}),
conns: make(map[routing.Loop]io.ReadWriteCloser),
}

go app.handleProto()

if err := app.proto.Send(FrameInit, conf, nil); err != nil {
if err := app.Close(); err != nil {
log.WithError(err).Warn("Failed to close app")
}
return nil, fmt.Errorf("INIT handshake failed: %s", err)
}

return app, nil
}

// Setup setups app using default pair of pipes
func Setup(config *Config) (*App, error) {
return SetupFromPipe(config, DefaultIn, DefaultOut)
Expand Down Expand Up @@ -197,6 +219,7 @@ func (app *App) serveConn(loop routing.Loop, conn io.ReadWriteCloser) {
if err != nil {
break
}
fmt.Println("READ:", buf)

packet := &Packet{Loop: loop, Payload: buf[:n]}
if err := app.proto.Send(FrameSend, packet, nil); err != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestAppDial(t *testing.T) {
require.Len(t, app.conns, 0)
app.mu.Unlock()
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppAccept(t *testing.T) {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestAppAccept(t *testing.T) {
assert.Equal(t, lpk.Hex()+":2", conn.LocalAddr().String())
require.Len(t, app.conns, 2)
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppWrite(t *testing.T) {
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestAppWrite(t *testing.T) {
assert.Equal(t, []byte("foo"), packet.Payload)

require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
require.NoError(t, appOut.Close())
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func TestAppRead(t *testing.T) {
require.NoError(t, <-errCh)

require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
require.NoError(t, appOut.Close())
}

Expand Down Expand Up @@ -238,7 +238,7 @@ func TestAppSetup(t *testing.T) {
assert.Equal(t, "0.0.1", config.ProtocolVersion)

require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppCloseConn(t *testing.T) {
Expand All @@ -265,7 +265,7 @@ func TestAppCloseConn(t *testing.T) {
require.Len(t, app.conns, 0)

require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppClose(t *testing.T) {
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestAppClose(t *testing.T) {
assert.Equal(t, routing.Port(3), loop.Remote.Port)

require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppCommand(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/router/app_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/skycoin/skycoin/src/util/logging"
Expand Down Expand Up @@ -59,7 +60,8 @@ func (am *appManager) Serve() error {
func (am *appManager) initApp(payload []byte) error {
var config app.Config
if err := json.Unmarshal(payload, &config); err != nil {
return errors.New("invalid Init payload")
fmt.Println("invalid init:", string(payload))
return fmt.Errorf("invalid INIT payload: %v", err)
}

if config.ProtocolVersion != supportedProtocolVersion {
Expand Down
8 changes: 4 additions & 4 deletions pkg/router/app_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestAppManagerInit(t *testing.T) {
require.NoError(t, in.Close())
require.NoError(t, <-srvCh)
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppManagerSetupLoop(t *testing.T) {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestAppManagerSetupLoop(t *testing.T) {
require.NoError(t, in.Close())
require.NoError(t, <-srvCh)
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppManagerCloseLoop(t *testing.T) {
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestAppManagerCloseLoop(t *testing.T) {
require.NoError(t, in.Close())
require.NoError(t, <-srvCh)
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}

func TestAppManagerForward(t *testing.T) {
Expand Down Expand Up @@ -166,5 +166,5 @@ func TestAppManagerForward(t *testing.T) {
require.NoError(t, in.Close())
require.NoError(t, <-srvCh)
require.NoError(t, proto.Close())
require.NoError(t, testhelpers.NoErrorWithinTimeout(serveErrCh))
require.NoError(t, testhelpers.WithinTimeout(serveErrCh))
}
16 changes: 8 additions & 8 deletions pkg/router/route_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ import (
"github.com/skycoin/skywire/pkg/routing"
)

const (
rtGarbageCollectDuration = time.Minute * 5
)

type RMConfig struct {
SetupPKs []cipher.PubKey // Trusted setup PKs.
OnConfirmLoop func(loop routing.Loop, rule routing.Rule) (err error)
OnLoopClosed func(loop routing.Loop) error
SetupPKs []cipher.PubKey // Trusted setup PKs.
GarbageCollectDuration time.Duration
OnConfirmLoop func(loop routing.Loop, rule routing.Rule) (err error)
OnLoopClosed func(loop routing.Loop) error
}

func (sc RMConfig) SetupIsTrusted(sPK cipher.PubKey) bool {
Expand Down Expand Up @@ -135,7 +132,10 @@ func (rm *routeManager) handleSetupConn(conn net.Conn) error {
}

func (rm *routeManager) rtGarbageCollectLoop() {
ticker := time.NewTicker(rtGarbageCollectDuration)
if rm.conf.GarbageCollectDuration <= 0 {
return
}
ticker := time.NewTicker(rm.conf.GarbageCollectDuration)
defer ticker.Stop()
for {
select {
Expand Down
50 changes: 36 additions & 14 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,36 @@ import (
const (
// RouteTTL is the default expiration interval for routes
RouteTTL = 2 * time.Hour
minHops = 0
maxHops = 50

// DefaultGarbageCollectDuration is the default duration for garbage collection of routing rules.
DefaultGarbageCollectDuration = time.Second * 5

minHops = 0
maxHops = 50
)

var log = logging.MustGetLogger("router")

// Config configures Router.
type Config struct {
Logger *logging.Logger
PubKey cipher.PubKey
SecKey cipher.SecKey
TransportManager *transport.Manager
RoutingTable routing.Table
RouteFinder routeFinder.Client
SetupNodes []cipher.PubKey
Logger *logging.Logger
PubKey cipher.PubKey
SecKey cipher.SecKey
TransportManager *transport.Manager
RoutingTable routing.Table
RouteFinder routeFinder.Client
SetupNodes []cipher.PubKey
GarbageCollectDuration time.Duration
}

// SetDefaults sets default values for certain empty values.
func (c *Config) SetDefaults() {
if c.Logger == nil {
c.Logger = log
}
if c.GarbageCollectDuration <= 0 {
c.GarbageCollectDuration = DefaultGarbageCollectDuration
}
}

// Router implements node.PacketRouter. It manages routing table by
Expand All @@ -62,6 +77,8 @@ type Router struct {

// New constructs a new Router.
func New(n *snet.Network, config *Config) (*Router, error) {
config.SetDefaults()

r := &Router{
Logger: config.Logger,
n: n,
Expand All @@ -73,9 +90,10 @@ func New(n *snet.Network, config *Config) (*Router, error) {

// Prepare route manager.
rm, err := NewRouteManager(n, config.RoutingTable, RMConfig{
SetupPKs: config.SetupNodes,
OnConfirmLoop: r.confirmLoop,
OnLoopClosed: r.loopClosed,
SetupPKs: config.SetupNodes,
GarbageCollectDuration: config.GarbageCollectDuration,
OnConfirmLoop: r.confirmLoop,
OnLoopClosed: r.loopClosed,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -111,7 +129,8 @@ func (r *Router) Serve(ctx context.Context) error {
r.wg.Done()
}()

return r.tm.Serve(ctx)
r.tm.Serve(ctx)
return nil
}

func (r *Router) handlePacket(ctx context.Context, packet routing.Packet) error {
Expand Down Expand Up @@ -208,9 +227,12 @@ func (r *Router) consumePacket(payload []byte, rule routing.Rule) error {
if err != nil {
return err
}
if err := b.conn.Send(app.FrameSend, p, nil); err != nil {
fmt.Println("got it!")
if err := b.conn.Send(app.FrameSend, p, nil); err != nil { // TODO: Stuck here.
fmt.Println("err:", err)
return err
}
fmt.Println("done")

r.Logger.Infof("Forwarded packet to App on Port %d", rule.LocalPort())
return nil
Expand Down
Loading

0 comments on commit 9ad1927

Please sign in to comment.