From da68101ee2f6eadc64fb5169b951d17e84418ccf Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 27 Nov 2019 18:48:46 +0300 Subject: [PATCH 1/3] Cancel reading if deadline occurs, improve RouteGroup implementation --- pkg/router/route_group.go | 80 ++++++++++++++++++++++++---------- pkg/router/route_group_test.go | 48 +++++++++----------- pkg/routing/packet.go | 2 +- 3 files changed, 79 insertions(+), 51 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index be140305b..ba63e1d70 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "time" - "github.com/SkycoinProject/dmsg/ioutil" "github.com/SkycoinProject/skycoin/src/util/logging" "github.com/SkycoinProject/skywire-mainnet/pkg/routing" @@ -82,8 +81,10 @@ type RouteGroup struct { done chan struct{} once sync.Once - readTimer *time.Timer - writeTimer *time.Timer + readTimer *time.Timer + writeTimer *time.Timer + // TODO: try to implement timed out flags with open/closed chan struct{} + // in order to be able to check timeout in select statement readTimedOut atomicbool.Bool // set true when read deadline has been reached writeTimedOut atomicbool.Bool // set true when write deadline has been reached } @@ -94,7 +95,7 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe } rg := &RouteGroup{ - logger: logging.MustGetLogger(fmt.Sprintf("RouteGroup %v", desc)), + logger: logging.MustGetLogger(fmt.Sprintf("RouteGroup %s", desc.String())), desc: desc, rt: rt, tps: make([]*transport.ManagedTransport, 0), @@ -115,7 +116,12 @@ func NewRouteGroup(cfg *RouteGroupConfig, rt routing.Table, desc routing.RouteDe // to the appropriate RouteGroup via (*RouteGroup).readCh. // To help with implementing the read logic, within the dmsg repo, we have ioutil.BufRead, // just in case the read buffer is short. +// TODO: too long, simplify func (r *RouteGroup) Read(p []byte) (n int, err error) { + if r.isClosed() { + return 0, io.ErrClosedPipe + } + if r.readTimedOut.IsSet() { return 0, timeoutError{} } @@ -124,37 +130,68 @@ func (r *RouteGroup) Read(p []byte) (n int, err error) { return 0, nil } - r.mu.Lock() - if r.readBuf.Len() > 0 { - data, err := r.readBuf.Read(p) - r.mu.Unlock() - - return data, err - } - r.mu.Unlock() + // TODO: use readBuf + // r.mu.Lock() + // if r.readBuf.Len() > 0 { + // data, err := r.readBuf.Read(p) + // r.mu.Unlock() + // + // return data, err + // } + // r.mu.Unlock() + + timeout := make(chan struct{}) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + ticker := time.NewTicker(100 * time.Millisecond) + loop: + for { + select { + case <-ticker.C: + if r.readTimedOut.IsSet() { + close(timeout) + break loop + } + case <-ctx.Done(): + break loop + } + } + ticker.Stop() + }() - data, ok := <-r.readCh - if !ok { - return 0, io.ErrClosedPipe + var data []byte + select { + case data = <-r.readCh: + case <-timeout: + return 0, timeoutError{} + case <-time.After(5 * time.Second): + return 0, io.EOF } r.mu.Lock() defer r.mu.Unlock() - return ioutil.BufRead(&r.readBuf, data, p) + // return ioutil.BufRead(&r.readBuf, data, p) + + n = copy(p, data) + + return n, nil } // Write writes payload to a RouteGroup // For the first version, only the first ForwardRule (fwd[0]) is used for writing. func (r *RouteGroup) Write(p []byte) (n int, err error) { - if r.writeTimedOut.IsSet() { - return 0, timeoutError{} - } - if r.isClosed() { return 0, io.ErrClosedPipe } + if r.writeTimedOut.IsSet() { + return 0, timeoutError{} + } + r.mu.Lock() defer r.mu.Unlock() @@ -187,7 +224,6 @@ func (r *RouteGroup) Write(p []byte) (n int, err error) { // - Send Close packet for all ForwardRules. // - Delete all rules (ForwardRules and ConsumeRules) from routing table. // - Close all go channels. -// TODO: fix hang after read func (r *RouteGroup) Close() error { r.mu.Lock() defer r.mu.Unlock() @@ -214,7 +250,7 @@ func (r *RouteGroup) Close() error { r.once.Do(func() { close(r.done) - close(r.readCh) + // close(r.readCh) // TODO: uncomment }) return nil diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index 8c77ad37d..d7141d022 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -41,37 +41,13 @@ func TestRouteGroup_Read(t *testing.T) { buf2 := make([]byte, len(msg2)) rg1 := createRouteGroup() - require.NotNil(t, rg1) - - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(3*time.Second)) - defer cancel() - - errCh := make(chan error, 1) - - go func() { - _, err := rg1.Read(buf1) - errCh <- err - }() - - var err error - select { - case <-ctx.Done(): - err = ctx.Err() - case err = <-errCh: - } - require.Equal(t, context.DeadlineExceeded, err) - require.NoError(t, rg1.Close()) - - rg1 = createRouteGroup() rg2 := createRouteGroup() _, _, teardown := createTransports(t, rg1, rg2) defer teardown() - go func() { - rg1.readCh <- msg1 - rg2.readCh <- msg2 - }() + rg1.readCh <- msg1 + rg2.readCh <- msg2 n, err := rg1.Read(buf1) require.NoError(t, err) @@ -391,12 +367,22 @@ func pushPackets(ctx context.Context, t *testing.T, from *transport.Manager, to default: packet, err := from.ReadPacket() assert.NoError(t, err) + + if packet.Type() != routing.DataPacket { + continue + } + + payload := packet.Payload() + if len(payload) != int(packet.Size()) { + panic("malformed packet") + } + select { case <-ctx.Done(): return case <-to.done: return - case to.readCh <- packet.Payload(): + case to.readCh <- payload: } } } @@ -411,7 +397,8 @@ func createRouteGroup() *RouteGroup { port2 := routing.Port(2) desc := routing.NewRouteDescriptor(pk1, pk2, port1, port2) - rg := NewRouteGroup(DefaultRouteGroupConfig(), rt, desc) + cfg := DefaultRouteGroupConfig() + rg := NewRouteGroup(cfg, rt, desc) return rg } @@ -438,10 +425,15 @@ func createTransports(t *testing.T, rg1, rg2 *RouteGroup) (m1, m2 *transport.Man rule1 := routing.ForwardRule(keepAlive, id1, id2, tp2.Entry.ID, keys[0].PK, port1, port2) rule2 := routing.ForwardRule(keepAlive, id2, id1, tp1.Entry.ID, keys[1].PK, port2, port1) + rg1.mu.Lock() rg1.tps = append(rg1.tps, tp1) rg1.fwd = append(rg1.fwd, rule1) + rg1.mu.Unlock() + + rg2.mu.Lock() rg2.tps = append(rg2.tps, tp2) rg2.fwd = append(rg2.fwd, rule2) + rg2.mu.Unlock() return m1, m2, func() { nEnv.Teardown() diff --git a/pkg/routing/packet.go b/pkg/routing/packet.go index c7c7cfa18..1ffbda729 100644 --- a/pkg/routing/packet.go +++ b/pkg/routing/packet.go @@ -127,5 +127,5 @@ func (p Packet) RouteID() RouteID { // Payload returns payload from a Packet. func (p Packet) Payload() []byte { - return p[PacketPayloadOffset:] + return p[PacketPayloadOffset:] // TODO: consider checking if real payload size differs } From 064c9a09935a1c4449e81dc3aac1acd46032bfd9 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 28 Nov 2019 11:47:19 +0300 Subject: [PATCH 2/3] Add tests for RouteGroup with arbitrary payload size --- pkg/router/route_group_test.go | 121 +++++++++++++++++++++++++++++++++ 1 file changed, 121 insertions(+) diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index d7141d022..a3aa0f15c 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "net" "strconv" + "strings" "sync" "testing" "time" @@ -258,6 +259,126 @@ func testMultipleWR(t *testing.T, iterations int, rg1, rg2 io.ReadWriter, msg1, } } +func TestArbitrarySizeOneMessage(t *testing.T) { + // Test fails if message size is above 4059 + const ( + value1 = 4058 + value2 = 4059 + ) + + var wg sync.WaitGroup + + wg.Add(1) + + t.Run("Value1", func(t *testing.T) { + defer wg.Done() + testArbitrarySizeOneMessage(t, value1) + }) + + wg.Wait() + + t.Run("Value2", func(t *testing.T) { + testArbitrarySizeOneMessage(t, value2) + }) +} + +func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) { + // Test fails if message size is above 64810 + const ( + value1 = 64810 // 2^16 - 726 + value2 = 64811 // 2^16 - 725 + ) + + var wg sync.WaitGroup + + wg.Add(1) + + t.Run("Value1", func(t *testing.T) { + defer wg.Done() + testArbitrarySizeMultipleMessagesByChunks(t, value1) + }) + + wg.Wait() + + t.Run("Value2", func(t *testing.T) { + testArbitrarySizeMultipleMessagesByChunks(t, value2) + }) +} + +func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) { + rg1 := createRouteGroup() + rg2 := createRouteGroup() + m1, m2, teardownEnv := createTransports(t, rg1, rg2) + + ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + cancel() + teardownEnv() + }() + + go pushPackets(ctx, t, m1, rg1) + + go pushPackets(ctx, t, m2, rg2) + + chunkSize := 1024 + + msg := []byte(strings.Repeat("A", size)) + + for offset := 0; offset < size; offset += chunkSize { + _, err := rg1.Write(msg[offset : offset+chunkSize]) + require.NoError(t, err) + } + + for offset := 0; offset < size; offset += chunkSize { + buf := make([]byte, chunkSize) + n, err := rg2.Read(buf) + require.NoError(t, err) + require.Equal(t, chunkSize, n) + require.Equal(t, msg[offset:offset+chunkSize], buf) + } + + buf := make([]byte, chunkSize) + n, err := rg2.Read(buf) + assert.Equal(t, io.EOF, err) + assert.Equal(t, 0, n) + assert.Equal(t, make([]byte, chunkSize), buf) +} + +func testArbitrarySizeOneMessage(t *testing.T, size int) { + rg1 := createRouteGroup() + rg2 := createRouteGroup() + m1, m2, teardownEnv := createTransports(t, rg1, rg2) + + ctx, cancel := context.WithCancel(context.Background()) + + defer func() { + cancel() + teardownEnv() + }() + + go pushPackets(ctx, t, m1, rg1) + + go pushPackets(ctx, t, m2, rg2) + + msg := []byte(strings.Repeat("A", size)) + + _, err := rg1.Write(msg) + require.NoError(t, err) + + buf := make([]byte, size) + n, err := rg2.Read(buf) + require.NoError(t, err) + require.Equal(t, size, n) + require.Equal(t, msg, buf) + + buf = make([]byte, size) + n, err = rg2.Read(buf) + require.Equal(t, io.EOF, err) + require.Equal(t, 0, n) + require.Equal(t, make([]byte, size), buf) +} + func TestRouteGroup_LocalAddr(t *testing.T) { rg := createRouteGroup() require.Equal(t, rg.desc.Src(), rg.LocalAddr()) From 6a78de70de763832f7a118614a52daab1cb16a07 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Thu, 28 Nov 2019 21:59:15 +0300 Subject: [PATCH 3/3] Fix panic on visor exit --- pkg/router/router.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/router/router.go b/pkg/router/router.go index 04092041c..baad021bc 100644 --- a/pkg/router/router.go +++ b/pkg/router/router.go @@ -144,6 +144,7 @@ func New(n *snet.Network, config *Config) (Router, error) { rgs: make(map[routing.RouteDescriptor]*RouteGroup), rpcSrv: rpc.NewServer(), accept: make(chan routing.EdgeRules, acceptSize), + done: make(chan struct{}), trustedNodes: trustedNodes, }