Skip to content

Commit

Permalink
Fix testPresentTimeout subtest of TestConn, fix possible memory leak
Browse files Browse the repository at this point in the history
Regarding memory leak. In the `Write` method of the route group we call
`writePacketAsync`. It returns `errCh` to us to wait the error from, this
channel is not buffered, so `writePacketAsync`'s goroutine actually blocks
while trying to pass error to this channel. But `Write` blocks on `select`
waiting either error from `errCh` or error from write deadline struct. And
if it gets deadline error, it just returns without waiting for any error from
`errCh`, making `writePacketAsync` block forever on sending to the channel.
And we might have had a lot of such running goroutines
  • Loading branch information
Darkren committed Jan 29, 2020
1 parent 8170ef0 commit 42b9559
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 29 deletions.
10 changes: 8 additions & 2 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,14 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) {
func (rg *RouteGroup) writePacketAsync(ctx context.Context, tp *transport.ManagedTransport, packet routing.Packet) chan error {
errCh := make(chan error)
go func() {
errCh <- tp.WritePacket(ctx, packet)
close(errCh)
defer close(errCh)
err := tp.WritePacket(ctx, packet)
select {
case <-ctx.Done():
return
case errCh <- err:
return
}
}()

return errCh
Expand Down
58 changes: 32 additions & 26 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package router
import (
"context"
"fmt"
"golang.org/x/net/nettest"
"io"
"math/rand"
"net"
Expand All @@ -16,6 +15,7 @@ import (
"github.com/SkycoinProject/dmsg/cipher"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/nettest"

"github.com/SkycoinProject/skywire-mainnet/pkg/routing"
"github.com/SkycoinProject/skywire-mainnet/pkg/snet/snettest"
Expand All @@ -24,7 +24,7 @@ import (
)

func TestNewRouteGroup(t *testing.T) {
rg := createRouteGroup()
rg := createRouteGroup(DefaultRouteGroupConfig())
require.NotNil(t, rg)
require.Equal(t, DefaultRouteGroupConfig(), rg.cfg)
}
Expand All @@ -49,8 +49,8 @@ func TestRouteGroup_Close(t *testing.T) {
require.NotNil(t, tp1.Entry)
require.NotNil(t, tp2.Entry)

rg0 := createRouteGroup()
rg1 := createRouteGroup()
rg0 := createRouteGroup(DefaultRouteGroupConfig())
rg1 := createRouteGroup(DefaultRouteGroupConfig())

// reserve FWD and CNSM IDs for r0.
r0RtIDs, err := rg0.rt.ReserveKeys(2)
Expand Down Expand Up @@ -146,8 +146,8 @@ func TestRouteGroup_Read(t *testing.T) {
buf3 := make([]byte, len(msg2)/2)
buf4 := make([]byte, len(msg2)/2)

rg1 := createRouteGroup()
rg2 := createRouteGroup()
rg1 := createRouteGroup(DefaultRouteGroupConfig())
rg2 := createRouteGroup(DefaultRouteGroupConfig())

_, _, teardown := createTransports(t, rg1, rg2, stcp.Type)
defer teardown()
Expand Down Expand Up @@ -188,15 +188,15 @@ func TestRouteGroup_Read(t *testing.T) {
func TestRouteGroup_Write(t *testing.T) {
msg1 := []byte("hello1")

rg1 := createRouteGroup()
rg1 := createRouteGroup(DefaultRouteGroupConfig())
require.NotNil(t, rg1)

_, err := rg1.Write(msg1)
require.Equal(t, ErrNoTransports, err)
require.NoError(t, rg1.Close())

rg1 = createRouteGroup()
rg2 := createRouteGroup()
rg1 = createRouteGroup(DefaultRouteGroupConfig())
rg2 := createRouteGroup(DefaultRouteGroupConfig())

m1, m2, teardown := createTransports(t, rg1, rg2, stcp.Type)
defer teardown()
Expand Down Expand Up @@ -264,8 +264,8 @@ func TestRouteGroup_ReadWrite(t *testing.T) {
}

func testReadWrite(t *testing.T, iterations int) {
rg1 := createRouteGroup()
rg2 := createRouteGroup()
rg1 := createRouteGroup(DefaultRouteGroupConfig())
rg2 := createRouteGroup(DefaultRouteGroupConfig())
m1, m2, teardownEnv := createTransports(t, rg1, rg2, stcp.Type)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -486,10 +486,8 @@ func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) {
require.NotNil(t, tp1.Entry)
require.NotNil(t, tp2.Entry)

rg0 := createRouteGroup()
fmt.Printf("RG0 Addr: %s\n", rg0.LocalAddr().String())
rg1 := createRouteGroup()
fmt.Printf("RG Addr: %s\n", rg1.LocalAddr().String())
rg0 := createRouteGroup(DefaultRouteGroupConfig())
rg1 := createRouteGroup(DefaultRouteGroupConfig())

r0RtIDs, err := rg0.rt.ReserveKeys(1)
require.NoError(t, err)
Expand Down Expand Up @@ -589,10 +587,8 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) {
require.NotNil(t, tp1.Entry)
require.NotNil(t, tp2.Entry)

rg0 := createRouteGroup()
fmt.Printf("RG0 Addr: %s\n", rg0.LocalAddr().String())
rg1 := createRouteGroup()
fmt.Printf("RG Addr: %s\n", rg1.LocalAddr().String())
rg0 := createRouteGroup(DefaultRouteGroupConfig())
rg1 := createRouteGroup(DefaultRouteGroupConfig())

r0RtIDs, err := rg0.rt.ReserveKeys(1)
require.NoError(t, err)
Expand Down Expand Up @@ -668,14 +664,14 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) {
}

func TestRouteGroup_LocalAddr(t *testing.T) {
rg := createRouteGroup()
rg := createRouteGroup(DefaultRouteGroupConfig())
require.Equal(t, rg.desc.Dst(), rg.LocalAddr())

require.NoError(t, rg.Close())
}

func TestRouteGroup_RemoteAddr(t *testing.T) {
rg := createRouteGroup()
rg := createRouteGroup(DefaultRouteGroupConfig())
require.Equal(t, rg.desc.Src(), rg.RemoteAddr())

require.NoError(t, rg.Close())
Expand All @@ -702,8 +698,15 @@ func TestRouteGroup_TestConn(t *testing.T) {
require.NotNil(t, tp1.Entry)
require.NotNil(t, tp2.Entry)

rg0 := createRouteGroup()
rg1 := createRouteGroup()
// because some subtests of `TestConn` are highly specific in their behaviour,
// it's best to exceed the `readCh` size
rgCfg := &RouteGroupConfig{
ReadChBufSize: defaultReadChBufSize * 3,
KeepAliveInterval: defaultRouteGroupKeepAliveInterval,
}

rg0 := createRouteGroup(rgCfg)
rg1 := createRouteGroup(rgCfg)

r0RtIDs, err := rg0.rt.ReserveKeys(1)
require.NoError(t, err)
Expand Down Expand Up @@ -739,9 +742,11 @@ func TestRouteGroup_TestConn(t *testing.T) {
if err := rg0.Close(); err != nil {
//panic(err)
}
fmt.Printf("CLOSED 0 %s\n", rg0.LocalAddr().String())
if err := rg1.Close(); err != nil {
//panic(err)
}
fmt.Printf("CLOSED 1 %s\n", rg1.LocalAddr().String())
cancel()
nEnv.Teardown()
}
Expand Down Expand Up @@ -772,9 +777,10 @@ func pushPackets(ctx context.Context, from *transport.Manager, to *RouteGroup) {

switch packet.Type() {
case routing.ClosePacket:
fmt.Printf("GOT CLOSE PACKET ON %s\n", to.LocalAddr().String())
if to.isClosed() {
// TODO: this panic rises on some subtests of `TestConn`, need to find out the reason
panic(io.ErrClosedPipe)
return
}

if err := to.handleClosePacket(routing.CloseCode(packet.Payload()[0])); err != nil {
Expand Down Expand Up @@ -813,7 +819,7 @@ func safeSend(ctx context.Context, to *RouteGroup, payload []byte) (keepSending
}
}

func createRouteGroup() *RouteGroup {
func createRouteGroup(cfg *RouteGroupConfig) *RouteGroup {
rt := routing.NewTable(routing.DefaultConfig())

pk1, _ := cipher.GenerateKeyPair()
Expand All @@ -822,7 +828,7 @@ func createRouteGroup() *RouteGroup {
port2 := routing.Port(2)
desc := routing.NewRouteDescriptor(pk1, pk2, port1, port2)

rg := NewRouteGroup(nil, rt, desc)
rg := NewRouteGroup(cfg, rt, desc)

return rg
}
Expand Down
2 changes: 1 addition & 1 deletion vendor/golang.org/x/net/nettest/conntest.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 42b9559

Please sign in to comment.