Skip to content

Commit

Permalink
Merge branch 'fix/app2-router2-tests' of https://github.com/Darkren/s…
Browse files Browse the repository at this point in the history
…kywire-mainnet into fix/adapt-to-services
  • Loading branch information
Darkren committed Nov 28, 2019
2 parents 415b4fd + 6a78de7 commit 1d57ce1
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 56 deletions.
84 changes: 57 additions & 27 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync/atomic"
"time"

"github.com/SkycoinProject/dmsg/ioutil"
"github.com/SkycoinProject/skycoin/src/util/logging"

"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
Expand Down Expand Up @@ -82,8 +81,10 @@ type RouteGroup struct {
done chan struct{}
once sync.Once

readTimer *time.Timer
writeTimer *time.Timer
readTimer *time.Timer
writeTimer *time.Timer
// TODO: try to implement timed out flags with open/closed chan struct{}
// in order to be able to check timeout in select statement
readTimedOut atomicbool.Bool // set true when read deadline has been reached
writeTimedOut atomicbool.Bool // set true when write deadline has been reached
}
Expand All @@ -94,7 +95,7 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
}

rg := &RouteGroup{
logger: logging.MustGetLogger(fmt.Sprintf("RouteGroup %v", desc)),
logger: logging.MustGetLogger(fmt.Sprintf("RouteGroup %s", desc.String())),
desc: desc,
rt: rt,
tps: make([]*transport.ManagedTransport, 0),
Expand All @@ -115,7 +116,12 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe
// to the appropriate RouteGroup via (*RouteGroup).readCh.
// To help with implementing the read logic, within the dmsg repo, we have ioutil.BufRead,
// just in case the read buffer is short.
// TODO: too long, simplify
func (r *RouteGroup) Read(p []byte) (n int, err error) {
if r.isClosed() {
return 0, io.ErrClosedPipe
}

if r.readTimedOut.IsSet() {
r.logger.Infoln("TIMEOUT ERROR?")
return 0, timeoutError{}
Expand All @@ -125,43 +131,68 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) {
return 0, nil
}

r.mu.Lock()
if r.readBuf.Len() > 0 {
r.logger.Infoln("BLOCKING BEFORE BUF READ")
data, err := r.readBuf.Read(p)
r.logger.Infoln("GOT SOME FROM BUF READ")
r.mu.Unlock()

return data, err
}
r.mu.Unlock()
// TODO: use readBuf
// r.mu.Lock()
// if r.readBuf.Len() > 0 {
// data, err := r.readBuf.Read(p)
// r.mu.Unlock()
//
// return data, err
// }
// r.mu.Unlock()

timeout := make(chan struct{})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
ticker := time.NewTicker(100 * time.Millisecond)
loop:
for {
select {
case <-ticker.C:
if r.readTimedOut.IsSet() {
close(timeout)
break loop
}
case <-ctx.Done():
break loop
}
}
ticker.Stop()
}()

r.logger.Infoln("BLOCKING BEFORE READ CHAN")
data, ok := <-r.readCh
if !ok {
r.logger.Infof("COULDN'T READ DATA")
return 0, io.ErrClosedPipe
var data []byte
select {
case data = <-r.readCh:
case <-timeout:
return 0, timeoutError{}
case <-time.After(5 * time.Second):
return 0, io.EOF
}

r.mu.Lock()
defer r.mu.Unlock()

r.logger.Infof("READ DATA FROM CHAN: %s", data)
// return ioutil.BufRead(&r.readBuf, data, p)

n = copy(p, data)

return ioutil.BufRead(&r.readBuf, data, p)
return n, nil
}

// Write writes payload to a RouteGroup
// For the first version, only the first ForwardRule (fwd[0]) is used for writing.
func (r *RouteGroup) Write(p []byte) (n int, err error) {
if r.writeTimedOut.IsSet() {
return 0, timeoutError{}
}

if r.isClosed() {
return 0, io.ErrClosedPipe
}

if r.writeTimedOut.IsSet() {
return 0, timeoutError{}
}

r.mu.Lock()
defer r.mu.Unlock()

Expand Down Expand Up @@ -194,7 +225,6 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) {
// - Send Close packet for all ForwardRules.
// - Delete all rules (ForwardRules and ConsumeRules) from routing table.
// - Close all go channels.
// TODO: fix hang after read
func (r *RouteGroup) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -221,7 +251,7 @@ func (r *RouteGroup) Close() error {

r.once.Do(func() {
close(r.done)
close(r.readCh)
// close(r.readCh) // TODO: uncomment
})

return nil
Expand Down
169 changes: 141 additions & 28 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -41,37 +42,13 @@ func TestRouteGroup_Read(t *testing.T) {
buf2 := make([]byte, len(msg2))

rg1 := createRouteGroup()
require.NotNil(t, rg1)

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second))
defer cancel()

errCh := make(chan error, 1)

go func() {
_, err := rg1.Read(buf1)
errCh <- err
}()

var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-errCh:
}
require.Equal(t, context.DeadlineExceeded, err)
require.NoError(t, rg1.Close())

rg1 = createRouteGroup()
rg2 := createRouteGroup()

_, _, teardown := createTransports(t, rg1, rg2)
defer teardown()

go func() {
rg1.readCh <- msg1
rg2.readCh <- msg2
}()
rg1.readCh <- msg1
rg2.readCh <- msg2

n, err := rg1.Read(buf1)
require.NoError(t, err)
Expand Down Expand Up @@ -282,6 +259,126 @@ func testMultipleWR(t *testing.T, iterations int, rg1, rg2 io.ReadWriter, msg1,
}
}

func TestArbitrarySizeOneMessage(t *testing.T) {
// Test fails if message size is above 4059
const (
value1 = 4058
value2 = 4059
)

var wg sync.WaitGroup

wg.Add(1)

t.Run("Value1", func(t *testing.T) {
defer wg.Done()
testArbitrarySizeOneMessage(t, value1)
})

wg.Wait()

t.Run("Value2", func(t *testing.T) {
testArbitrarySizeOneMessage(t, value2)
})
}

func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) {
// Test fails if message size is above 64810
const (
value1 = 64810 // 2^16 - 726
value2 = 64811 // 2^16 - 725
)

var wg sync.WaitGroup

wg.Add(1)

t.Run("Value1", func(t *testing.T) {
defer wg.Done()
testArbitrarySizeMultipleMessagesByChunks(t, value1)
})

wg.Wait()

t.Run("Value2", func(t *testing.T) {
testArbitrarySizeMultipleMessagesByChunks(t, value2)
})
}

func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) {
rg1 := createRouteGroup()
rg2 := createRouteGroup()
m1, m2, teardownEnv := createTransports(t, rg1, rg2)

ctx, cancel := context.WithCancel(context.Background())

defer func() {
cancel()
teardownEnv()
}()

go pushPackets(ctx, t, m1, rg1)

go pushPackets(ctx, t, m2, rg2)

chunkSize := 1024

msg := []byte(strings.Repeat("A", size))

for offset := 0; offset < size; offset += chunkSize {
_, err := rg1.Write(msg[offset : offset+chunkSize])
require.NoError(t, err)
}

for offset := 0; offset < size; offset += chunkSize {
buf := make([]byte, chunkSize)
n, err := rg2.Read(buf)
require.NoError(t, err)
require.Equal(t, chunkSize, n)
require.Equal(t, msg[offset:offset+chunkSize], buf)
}

buf := make([]byte, chunkSize)
n, err := rg2.Read(buf)
assert.Equal(t, io.EOF, err)
assert.Equal(t, 0, n)
assert.Equal(t, make([]byte, chunkSize), buf)
}

func testArbitrarySizeOneMessage(t *testing.T, size int) {
rg1 := createRouteGroup()
rg2 := createRouteGroup()
m1, m2, teardownEnv := createTransports(t, rg1, rg2)

ctx, cancel := context.WithCancel(context.Background())

defer func() {
cancel()
teardownEnv()
}()

go pushPackets(ctx, t, m1, rg1)

go pushPackets(ctx, t, m2, rg2)

msg := []byte(strings.Repeat("A", size))

_, err := rg1.Write(msg)
require.NoError(t, err)

buf := make([]byte, size)
n, err := rg2.Read(buf)
require.NoError(t, err)
require.Equal(t, size, n)
require.Equal(t, msg, buf)

buf = make([]byte, size)
n, err = rg2.Read(buf)
require.Equal(t, io.EOF, err)
require.Equal(t, 0, n)
require.Equal(t, make([]byte, size), buf)
}

func TestRouteGroup_LocalAddr(t *testing.T) {
rg := createRouteGroup()
require.Equal(t, rg.desc.Src(), rg.LocalAddr())
Expand Down Expand Up @@ -391,12 +488,22 @@ func pushPackets(ctx context.Context, t *testing.T, from *transport.Manager, to
default:
packet, err := from.ReadPacket()
assert.NoError(t, err)

if packet.Type() != routing.DataPacket {
continue
}

payload := packet.Payload()
if len(payload) != int(packet.Size()) {
panic("malformed packet")
}

select {
case <-ctx.Done():
return
case <-to.done:
return
case to.readCh <- packet.Payload():
case to.readCh <- payload:
}
}
}
Expand All @@ -411,7 +518,8 @@ func createRouteGroup() *RouteGroup {
port2 := routing.Port(2)
desc := routing.NewRouteDescriptor(pk1, pk2, port1, port2)

rg := NewRouteGroup(DefaultRouteGroupConfig(), rt, desc)
cfg := DefaultRouteGroupConfig()
rg := NewRouteGroup(cfg, rt, desc)

return rg
}
Expand All @@ -438,10 +546,15 @@ func createTransports(t *testing.T, rg1, rg2 *RouteGroup) (m1, m2 *transport.Man
rule1 := routing.ForwardRule(keepAlive, id1, id2, tp2.Entry.ID, keys[0].PK, port1, port2)
rule2 := routing.ForwardRule(keepAlive, id2, id1, tp1.Entry.ID, keys[1].PK, port2, port1)

rg1.mu.Lock()
rg1.tps = append(rg1.tps, tp1)
rg1.fwd = append(rg1.fwd, rule1)
rg1.mu.Unlock()

rg2.mu.Lock()
rg2.tps = append(rg2.tps, tp2)
rg2.fwd = append(rg2.fwd, rule2)
rg2.mu.Unlock()

return m1, m2, func() {
nEnv.Teardown()
Expand Down
1 change: 1 addition & 0 deletions pkg/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func New(n *snet.Network, config *Config) (Router, error) {
rgs: make(map[routing.RouteDescriptor]*RouteGroup),
rpcSrv: rpc.NewServer(),
accept: make(chan routing.EdgeRules, acceptSize),
done: make(chan struct{}),
trustedNodes: trustedNodes,
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/routing/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,5 @@ func (p Packet) RouteID() RouteID {

// Payload returns payload from a Packet.
func (p Packet) Payload() []byte {
return p[PacketPayloadOffset:]
return p[PacketPayloadOffset:] // TODO: consider checking if real payload size differs
}

0 comments on commit 1d57ce1

Please sign in to comment.