Skip to content

Commit

Permalink
Implement basic RouteGroup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Nov 18, 2019
1 parent 8ec4e2a commit ffe80e0
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 18 deletions.
77 changes: 65 additions & 12 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var (
ErrNoRules = errors.New("no rules")
// ErrNoTransport is returned when transport is nil.
ErrBadTransport = errors.New("bad transport")
// ErrTimeout happens if Read/Write times out.
ErrTimeout = errors.New("timeout")
)

type RouteGroupConfig struct {
Expand Down Expand Up @@ -65,7 +67,9 @@ type RouteGroup struct {
fwd []routing.Rule // forward rules (for writing)
rvs []routing.Rule // reverse rules (for reading)

lastSent int64
lastSent int64
readDeadline int64
writeDeadline int64

// 'readCh' reads in incoming packets of this route group.
// - Router should serve call '(*transport.Manager).ReadPacket' in a loop,
Expand Down Expand Up @@ -105,21 +109,68 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.readBuf.Len() > 0 {
return r.readBuf.Read(p)
}
var timeout <-chan time.Time
var timer *time.Timer

data, ok := <-r.readCh
if !ok {
return 0, io.ErrClosedPipe
if deadline := atomic.LoadInt64(&r.readDeadline); deadline != 0 {
delay := time.Until(time.Unix(0, deadline))

if delay > time.Duration(0) && r.readBuf.Len() > 0 {
return r.readBuf.Read(p)
}

timer = time.NewTimer(delay)
timeout = timer.C
}

return ioutil.BufRead(&r.readBuf, data, p)
select {
case data, ok := <-r.readCh:
if timer != nil {
timer.Stop()
}
if !ok {
return 0, io.ErrClosedPipe
}
return ioutil.BufRead(&r.readBuf, data, p)
case <-timeout:
return 0, ErrTimeout
}
}

// Write writes payload to a RouteGroup
// For the first version, only the first ForwardRule (fwd[0]) is used for writing.
func (r *RouteGroup) Write(p []byte) (n int, err error) {
var timeout <-chan time.Time
var timer *time.Timer
if deadline := atomic.LoadInt64(&r.writeDeadline); deadline != 0 {
delay := time.Until(time.Unix(0, deadline))
timer = time.NewTimer(delay)
timeout = timer.C
}

type values struct {
n int
err error
}

ch := make(chan values, 1)
go func() {
n, err := r.write(p)
ch <- values{n, err}
}()

select {
case v := <-ch:
if timer != nil {
timer.Stop()
}
return v.n, v.err
case <-timeout:
return 0, ErrTimeout
}
}

func (r *RouteGroup) write(p []byte) (n int, err error) {
if r.isClosed() {
return 0, io.ErrClosedPipe
}
Expand Down Expand Up @@ -193,18 +244,20 @@ func (r *RouteGroup) RemoteAddr() net.Addr {
return r.desc.Dst()
}

// TODO(nkryuchkov): implement
func (r *RouteGroup) SetDeadline(t time.Time) error {
return nil
if err := r.SetReadDeadline(t); err != nil {
return err
}
return r.SetWriteDeadline(t)
}

// TODO(nkryuchkov): implement
func (r *RouteGroup) SetReadDeadline(t time.Time) error {
atomic.StoreInt64(&r.readDeadline, t.UnixNano())
return nil
}

// TODO(nkryuchkov): implement
func (r *RouteGroup) SetWriteDeadline(t time.Time) error {
atomic.StoreInt64(&r.writeDeadline, t.UnixNano())
return nil
}

Expand Down
101 changes: 96 additions & 5 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,110 @@
package router

import (
"context"
"net"
"testing"
"time"

"github.com/SkycoinProject/dmsg/cipher"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
)

func TestNewRouteGroup(t *testing.T) {
_, rg := prepare()
require.NotNil(t, rg)
}

func TestRouteGroup_Close(t *testing.T) {
_, rg := prepare()
require.NotNil(t, rg)

require.False(t, rg.isClosed())
require.NoError(t, rg.Close())
require.True(t, rg.isClosed())
}

// TODO: implement better tests
func TestRouteGroup_Read(t *testing.T) {
_, rg := prepare()
require.NotNil(t, rg)

buf := make([]byte, defaultReadChBufSize)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second))
defer cancel()

errCh := make(chan error, 1)
go func() {
_, err := rg.Read(buf)
errCh <- err
}()

var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-errCh:
}
require.Equal(t, context.DeadlineExceeded, err)
}

// TODO: implement better tests
func TestRouteGroup_Write(t *testing.T) {
_, rg := prepare()
require.NotNil(t, rg)

buf := make([]byte, defaultReadChBufSize)
_, err := rg.Write(buf)
require.Equal(t, ErrNoTransports, err)
}

func TestRouteGroup_LocalAddr(t *testing.T) {
desc, rg := prepare()
require.Equal(t, desc.Src(), rg.LocalAddr())
}

func TestRouteGroup_RemoteAddr(t *testing.T) {
desc, rg := prepare()
require.Equal(t, desc.Dst(), rg.RemoteAddr())
}

func TestRouteGroup_SetReadDeadline(t *testing.T) {
_, rg := prepare()
now := time.Now()

require.NoError(t, rg.SetReadDeadline(now))
require.Equal(t, now.UnixNano(), rg.readDeadline)
}

func TestRouteGroup_SetWriteDeadline(t *testing.T) {
_, rg := prepare()
now := time.Now()

require.NoError(t, rg.SetWriteDeadline(now))
require.Equal(t, now.UnixNano(), rg.writeDeadline)
}

func TestRouteGroup_SetDeadline(t *testing.T) {
_, rg := prepare()
now := time.Now()

require.NoError(t, rg.SetDeadline(now))
require.Equal(t, now.UnixNano(), rg.readDeadline)
require.Equal(t, now.UnixNano(), rg.writeDeadline)
}

func TestRouteGroup_TestConn(t *testing.T) {
mp := func() (c1, c2 net.Conn, stop func(), err error) {
// TODO: implement
return
}
nettest.TestConn(t, mp)
}

func prepare() (routing.RouteDescriptor, *RouteGroup) {
rt := routing.NewTable(routing.DefaultConfig())

pk1, _ := cipher.GenerateKeyPair()
Expand All @@ -19,9 +114,5 @@ func TestNewRouteGroup(t *testing.T) {
desc := routing.NewRouteDescriptor(pk1, pk2, port1, port2)

rg := NewRouteGroup(DefaultRouteGroupConfig(), rt, desc)
require.NotNil(t, rg)

require.False(t, rg.isClosed())

require.NoError(t, rg.Close())
return desc, rg
}
1 change: 0 additions & 1 deletion pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func DefaultDialOptions() DialOptions {
}
}

// TODO(nkryuchkov): consider moving to visor package
type Router interface {
io.Closer

Expand Down

0 comments on commit ffe80e0

Please sign in to comment.