From ffe80e0610149b8b625c3eb4214e502b8890abe4 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 18 Nov 2019 23:01:36 +0300 Subject: [PATCH] Implement basic RouteGroup tests --- pkg/router/route_group.go | 77 +++++++++++++++++++++---- pkg/router/route_group_test.go | 101 +++++++++++++++++++++++++++++++-- pkg/router/router.go | 1 - 3 files changed, 161 insertions(+), 18 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 3ab1a7795..be13ee0e7 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -30,6 +30,8 @@ var ( ErrNoRules = errors.New("no rules") // ErrNoTransport is returned when transport is nil. ErrBadTransport = errors.New("bad transport") + // ErrTimeout happens if Read/Write times out. + ErrTimeout = errors.New("timeout") ) type RouteGroupConfig struct { @@ -65,7 +67,9 @@ type RouteGroup struct { fwd []routing.Rule // forward rules (for writing) rvs []routing.Rule // reverse rules (for reading) - lastSent int64 + lastSent int64 + readDeadline int64 + writeDeadline int64 // 'readCh' reads in incoming packets of this route group. // - Router should serve call '(*transport.Manager).ReadPacket' in a loop, @@ -105,21 +109,68 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { r.mu.Lock() defer r.mu.Unlock() - if r.readBuf.Len() > 0 { - return r.readBuf.Read(p) - } + var timeout <-chan time.Time + var timer *time.Timer - data, ok := <-r.readCh - if !ok { - return 0, io.ErrClosedPipe + if deadline := atomic.LoadInt64(&r.readDeadline); deadline != 0 { + delay := time.Until(time.Unix(0, deadline)) + + if delay > time.Duration(0) && r.readBuf.Len() > 0 { + return r.readBuf.Read(p) + } + + timer = time.NewTimer(delay) + timeout = timer.C } - return ioutil.BufRead(&r.readBuf, data, p) + select { + case data, ok := <-r.readCh: + if timer != nil { + timer.Stop() + } + if !ok { + return 0, io.ErrClosedPipe + } + return ioutil.BufRead(&r.readBuf, data, p) + case <-timeout: + return 0, ErrTimeout + } } // Write writes payload to a RouteGroup // For the first version, only the first ForwardRule (fwd[0]) is used for writing. func (r *RouteGroup) Write(p []byte) (n int, err error) { + var timeout <-chan time.Time + var timer *time.Timer + if deadline := atomic.LoadInt64(&r.writeDeadline); deadline != 0 { + delay := time.Until(time.Unix(0, deadline)) + timer = time.NewTimer(delay) + timeout = timer.C + } + + type values struct { + n int + err error + } + + ch := make(chan values, 1) + go func() { + n, err := r.write(p) + ch <- values{n, err} + }() + + select { + case v := <-ch: + if timer != nil { + timer.Stop() + } + return v.n, v.err + case <-timeout: + return 0, ErrTimeout + } +} + +func (r *RouteGroup) write(p []byte) (n int, err error) { if r.isClosed() { return 0, io.ErrClosedPipe } @@ -193,18 +244,20 @@ func (r *RouteGroup) RemoteAddr() net.Addr { return r.desc.Dst() } -// TODO(nkryuchkov): implement func (r *RouteGroup) SetDeadline(t time.Time) error { - return nil + if err := r.SetReadDeadline(t); err != nil { + return err + } + return r.SetWriteDeadline(t) } -// TODO(nkryuchkov): implement func (r *RouteGroup) SetReadDeadline(t time.Time) error { + atomic.StoreInt64(&r.readDeadline, t.UnixNano()) return nil } -// TODO(nkryuchkov): implement func (r *RouteGroup) SetWriteDeadline(t time.Time) error { + atomic.StoreInt64(&r.writeDeadline, t.UnixNano()) return nil } diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index f7ec71fa2..a9b497ea9 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -1,15 +1,110 @@ package router import ( + "context" + "net" "testing" + "time" "github.com/SkycoinProject/dmsg/cipher" "github.com/stretchr/testify/require" + "golang.org/x/net/nettest" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" ) func TestNewRouteGroup(t *testing.T) { + _, rg := prepare() + require.NotNil(t, rg) +} + +func TestRouteGroup_Close(t *testing.T) { + _, rg := prepare() + require.NotNil(t, rg) + + require.False(t, rg.isClosed()) + require.NoError(t, rg.Close()) + require.True(t, rg.isClosed()) +} + +// TODO: implement better tests +func TestRouteGroup_Read(t *testing.T) { + _, rg := prepare() + require.NotNil(t, rg) + + buf := make([]byte, defaultReadChBufSize) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) + defer cancel() + + errCh := make(chan error, 1) + go func() { + _, err := rg.Read(buf) + errCh <- err + }() + + var err error + select { + case <-ctx.Done(): + err = ctx.Err() + case err = <-errCh: + } + require.Equal(t, context.DeadlineExceeded, err) +} + +// TODO: implement better tests +func TestRouteGroup_Write(t *testing.T) { + _, rg := prepare() + require.NotNil(t, rg) + + buf := make([]byte, defaultReadChBufSize) + _, err := rg.Write(buf) + require.Equal(t, ErrNoTransports, err) +} + +func TestRouteGroup_LocalAddr(t *testing.T) { + desc, rg := prepare() + require.Equal(t, desc.Src(), rg.LocalAddr()) +} + +func TestRouteGroup_RemoteAddr(t *testing.T) { + desc, rg := prepare() + require.Equal(t, desc.Dst(), rg.RemoteAddr()) +} + +func TestRouteGroup_SetReadDeadline(t *testing.T) { + _, rg := prepare() + now := time.Now() + + require.NoError(t, rg.SetReadDeadline(now)) + require.Equal(t, now.UnixNano(), rg.readDeadline) +} + +func TestRouteGroup_SetWriteDeadline(t *testing.T) { + _, rg := prepare() + now := time.Now() + + require.NoError(t, rg.SetWriteDeadline(now)) + require.Equal(t, now.UnixNano(), rg.writeDeadline) +} + +func TestRouteGroup_SetDeadline(t *testing.T) { + _, rg := prepare() + now := time.Now() + + require.NoError(t, rg.SetDeadline(now)) + require.Equal(t, now.UnixNano(), rg.readDeadline) + require.Equal(t, now.UnixNano(), rg.writeDeadline) +} + +func TestRouteGroup_TestConn(t *testing.T) { + mp := func() (c1, c2 net.Conn, stop func(), err error) { + // TODO: implement + return + } + nettest.TestConn(t, mp) +} + +func prepare() (routing.RouteDescriptor, *RouteGroup) { rt := routing.NewTable(routing.DefaultConfig()) pk1, _ := cipher.GenerateKeyPair() @@ -19,9 +114,5 @@ func TestNewRouteGroup(t *testing.T) { desc := routing.NewRouteDescriptor(pk1, pk2, port1, port2) rg := NewRouteGroup(DefaultRouteGroupConfig(), rt, desc) - require.NotNil(t, rg) - - require.False(t, rg.isClosed()) - - require.NoError(t, rg.Close()) + return desc, rg } diff --git a/pkg/router/router.go b/pkg/router/router.go index e31fa78ec..67726a5d2 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -74,7 +74,6 @@ func DefaultDialOptions() DialOptions { } } -// TODO(nkryuchkov): consider moving to visor package type Router interface { io.Closer