Skip to content

Commit

Permalink
Merge branch 'fix/app2-router2-tests' of https://github.com/Darkren/s…
Browse files Browse the repository at this point in the history
…kywire-mainnet into fix/adapt-to-services
  • Loading branch information
Darkren committed Nov 22, 2019
2 parents 1c54333 + e4f2701 commit 76a6155
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 47 deletions.
14 changes: 9 additions & 5 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ var (
ErrNoRules = errors.New("no rules")
// ErrNoTransport is returned when transport is nil.
ErrBadTransport = errors.New("bad transport")
// ErrTimeout happens if Read/Write times out.
ErrTimeout = errors.New("timeout")
)

type timeoutError struct{}

func (timeoutError) Error() string { return "timeout" }
func (timeoutError) Timeout() bool { return true }
func (timeoutError) Temporary() bool { return true }

type RouteGroupConfig struct {
ReadChBufSize int
KeepAliveInterval time.Duration
Expand Down Expand Up @@ -136,7 +140,7 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
defer r.mu.Unlock()
return ioutil.BufRead(&r.readBuf, data, p)
case <-timeout:
return 0, ErrTimeout
return 0, timeoutError{}
}
}

Expand Down Expand Up @@ -169,7 +173,7 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
}
return v.n, v.err
case <-timeout:
return 0, ErrTimeout
return 0, timeoutError{}
}
}

Expand Down Expand Up @@ -234,7 +238,7 @@ func (r *RouteGroup) Close() error {

r.once.Do(func() {
close(r.done)
close(r.readCh)
// close(r.readCh) // TODO: fix panics and uncomment
})

return nil
Expand Down
189 changes: 147 additions & 42 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package router

import (
"context"
"math/rand"
"net"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -112,39 +115,140 @@ func TestRouteGroup_Write(t *testing.T) {
}

func TestRouteGroup_ReadWrite(t *testing.T) {
msg1 := []byte("hello1")
msg2 := []byte("hello2")

rg1 := createRouteGroup()
rg2 := createRouteGroup()

m1, m2, teardownEnv := createTransports(t, rg1, rg2)
defer teardownEnv()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go pushPackets(t, ctx, m1, rg1)
go pushPackets(t, ctx, m2, rg2)
for k := 0; k < 2; k++ {
msg1 := []byte("hello1_")
msg2 := []byte("hello2_")

_, err := rg1.Write(msg1)
require.NoError(t, err)
rg1 := createRouteGroup()
rg2 := createRouteGroup()

_, err = rg2.Write(msg2)
require.NoError(t, err)
m1, m2, _ := createTransports(t, rg1, rg2)

buf1 := make([]byte, len(msg2))
_, err = rg1.Read(buf1)
require.NoError(t, err)
require.Equal(t, msg2, buf1)
ctx, cancel := context.WithCancel(context.Background())

buf2 := make([]byte, len(msg1))
_, err = rg2.Read(buf2)
require.NoError(t, err)
require.Equal(t, msg1, buf2)
go pushPackets(t, ctx, m1, rg1)
go pushPackets(t, ctx, m2, rg2)

assert.NoError(t, rg1.Close())
assert.NoError(t, rg2.Close())
return
const iterations = 10

t.Run("Group", func(t *testing.T) {
t.Run("MultipleWriteRead", func(t *testing.T) {
for i := 0; i < iterations; i++ {
for j := 0; j < iterations; j++ {
_, err := rg1.Write(append(msg1, []byte(strconv.Itoa(j))...))
require.NoError(t, err)

_, err = rg2.Write(append(msg2, []byte(strconv.Itoa(j))...))
require.NoError(t, err)
}

for j := 0; j < iterations; j++ {
msg := append(msg2, []byte(strconv.Itoa(j))...)
buf1 := make([]byte, len(msg))
_, err := rg1.Read(buf1)
require.NoError(t, err)
require.Equal(t, msg, buf1)
}

for j := 0; j < iterations; j++ {
msg := append(msg1, []byte(strconv.Itoa(j))...)
buf2 := make([]byte, len(msg))
_, err := rg2.Read(buf2)
require.NoError(t, err)
require.Equal(t, msg, buf2)
}
}
})

t.Run("SingleReadWrite", func(t *testing.T) {
var err1, err2 error
go func() {
time.Sleep(1 * time.Second)
_, err1 = rg1.Write(msg1)
_, err2 = rg2.Write(msg2)
}()
require.NoError(t, err1)
require.NoError(t, err2)

buf1 := make([]byte, len(msg2))
_, err := rg1.Read(buf1)
require.NoError(t, err)
require.Equal(t, msg2, buf1)

buf2 := make([]byte, len(msg1))
_, err = rg2.Read(buf2)
require.NoError(t, err)
require.Equal(t, msg1, buf2)
})

t.Run("MultipleReadWrite", func(t *testing.T) {
var err1, err2 error
for i := 0; i < iterations; i++ {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

time.Sleep(100 * time.Millisecond)

for j := 0; j < iterations; j++ {
_, err := rg1.Write(append(msg1, []byte(strconv.Itoa(j))...))
require.NoError(t, err)

_, err = rg2.Write(append(msg2, []byte(strconv.Itoa(j))...))
require.NoError(t, err)
}
}()
require.NoError(t, err1)
require.NoError(t, err2)

for j := 0; j < iterations; j++ {
msg := append(msg2, []byte(strconv.Itoa(j))...)
buf1 := make([]byte, len(msg))
_, err := rg1.Read(buf1)
require.NoError(t, err)
require.Equal(t, msg, buf1)
}

for j := 0; j < iterations; j++ {
msg := append(msg1, []byte(strconv.Itoa(j))...)
buf2 := make([]byte, len(msg))
_, err := rg2.Read(buf2)
require.NoError(t, err)
require.Equal(t, msg, buf2)
}

wg.Wait()
}
})

t.Run("SingleWriteRead", func(t *testing.T) {
_, err := rg1.Write(msg1)
require.NoError(t, err)

_, err = rg2.Write(msg2)
require.NoError(t, err)

buf1 := make([]byte, len(msg2))
_, err = rg1.Read(buf1)
require.NoError(t, err)
require.Equal(t, msg2, buf1)

buf2 := make([]byte, len(msg1))
_, err = rg2.Read(buf2)
require.NoError(t, err)
require.Equal(t, msg1, buf2)
})
})

cancel()

assert.NoError(t, rg1.Close())
assert.NoError(t, rg2.Close())

// TODO: uncomment
// teardownEnv()
}
}

func TestRouteGroup_LocalAddr(t *testing.T) {
Expand Down Expand Up @@ -183,23 +287,23 @@ func TestRouteGroup_SetDeadline(t *testing.T) {
}

func TestRouteGroup_TestConn(t *testing.T) {
mp := func() (c1, c2 net.Conn, stop func(), err error) {
rg1 := createRouteGroup()
rg2 := createRouteGroup()
rg1 := createRouteGroup()
rg2 := createRouteGroup()

c1, c2 = rg1, rg2
// c1, c2 = rg1, rg2

m1, m2, teardownEnv := createTransports(t, rg1, rg2)
ctx, cancel := context.WithCancel(context.Background())
m1, m2, _ := createTransports(t, rg1, rg2)
ctx, _ := context.WithCancel(context.Background())

go pushPackets(t, ctx, m1, rg1)
go pushPackets(t, ctx, m2, rg2)
go pushPackets(t, ctx, m1, rg1)
go pushPackets(t, ctx, m2, rg2)

mp := func() (c1, c2 net.Conn, stop func(), err error) {
c1, c2 = rg1, rg2
stop = func() {
cancel()
teardownEnv()
assert.NoError(t, c1.Close())
assert.NoError(t, c2.Close())
// TODO: uncomment
// cancel()
// teardownEnv()
}
return
}
Expand Down Expand Up @@ -250,8 +354,9 @@ func createTransports(t *testing.T, rg1, rg2 *RouteGroup) (m1, m2 *transport.Man
require.NotNil(t, tp2.Entry)

keepAlive := 1 * time.Hour
id1 := routing.RouteID(1)
id2 := routing.RouteID(2)
// TODO: remove rand
id1 := routing.RouteID(rand.Int())
id2 := routing.RouteID(rand.Int())
port1 := routing.Port(1)
port2 := routing.Port(2)
rule1 := routing.ForwardRule(keepAlive, id1, id2, tp2.Entry.ID, keys[0].PK, port1, port2)
Expand Down

0 comments on commit 76a6155

Please sign in to comment.