From 46879d8deebbef4958cd40bd3042ddd07035da87 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Thu, 30 Jan 2020 21:17:25 +0300 Subject: [PATCH] Refactor route group tests --- pkg/router/route_group.go | 4 +- pkg/router/route_group_test.go | 422 +++++++++------------------------ 2 files changed, 111 insertions(+), 315 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 252199080..1b821924d 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -194,8 +194,6 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) { } rg.mu.Lock() - //defer rg.mu.Unlock() - tp, err := rg.tp() if err != nil { rg.mu.Unlock() @@ -207,7 +205,7 @@ func (rg *RouteGroup) Write(p []byte) (n int, err error) { rg.mu.Unlock() return 0, err } - + // we don't need to keep holding mutex from this point on rg.mu.Unlock() return rg.write(p, tp, rule) diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index 5d9b27964..ab3aac961 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "math/rand" "net" "strconv" "strings" @@ -30,114 +29,45 @@ func TestNewRouteGroup(t *testing.T) { } func TestRouteGroup_Close(t *testing.T) { - keys := snettest.GenKeyPairs(2) - - pk1 := keys[0].PK - pk2 := keys[1].PK - - // create test env - nEnv := snettest.NewEnv(t, keys, []string{stcp.Type}) - defer nEnv.Teardown() - - tpDisc := transport.NewDiscoveryMock() - tpKeys := snettest.GenKeyPairs(2) - - m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, tpKeys, nEnv, stcp.Type) - require.NoError(t, err) - require.NotNil(t, tp1) - require.NotNil(t, tp2) - require.NotNil(t, tp1.Entry) - require.NotNil(t, tp2.Entry) - - rg0 := createRouteGroup(DefaultRouteGroupConfig()) - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - - // reserve FWD and CNSM IDs for r0. - r0RtIDs, err := rg0.rt.ReserveKeys(2) - require.NoError(t, err) - - // reserve FWD and CNSM IDs for r1. - r1RtIDs, err := rg1.rt.ReserveKeys(2) - require.NoError(t, err) - - r0FwdRule := routing.ForwardRule(ruleKeepAlive, r0RtIDs[0], r1RtIDs[1], tp1.Entry.ID, pk2, pk1, 0, 0) - r0CnsmRule := routing.ConsumeRule(ruleKeepAlive, r0RtIDs[1], pk1, pk2, 0, 0) + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) - err = rg0.rt.SaveRule(r0FwdRule) - require.NoError(t, err) - err = rg0.rt.SaveRule(r0CnsmRule) - require.NoError(t, err) - - r1FwdRule := routing.ForwardRule(ruleKeepAlive, r1RtIDs[0], r0RtIDs[1], tp2.Entry.ID, pk1, pk2, 0, 0) - r1CnsmRule := routing.ConsumeRule(ruleKeepAlive, r1RtIDs[1], pk2, pk1, 0, 0) - - err = rg1.rt.SaveRule(r1FwdRule) - require.NoError(t, err) - err = rg1.rt.SaveRule(r1CnsmRule) - require.NoError(t, err) - - r0FwdRtDesc := r0FwdRule.RouteDescriptor() - rg0.desc = r0FwdRtDesc.Invert() - rg0.tps = append(rg0.tps, tp1) - rg0.fwd = append(rg0.fwd, r0FwdRule) - - r1FwdRtDesc := r1FwdRule.RouteDescriptor() - rg1.desc = r1FwdRtDesc.Invert() - rg1.tps = append(rg1.tps, tp2) - rg1.fwd = append(rg1.fwd, r1FwdRule) - - // push close packet from transport to route group - go func() { - packet, err := m1.ReadPacket() - if err != nil { - panic(err) - } - - if packet.Type() != routing.ClosePacket { - panic("wrong packet type") - } - - if err := rg0.handleClosePacket(routing.CloseCode(packet.Payload()[0])); err != nil { - panic(err) - } - }() + ctx, cancel := context.WithCancel(context.Background()) // push close packet from transport to route group - go func() { - packet, err := m2.ReadPacket() - if err != nil { - panic(err) - } - - if packet.Type() != routing.ClosePacket { - panic("wrong packet type") - } - - if err := rg1.handleClosePacket(routing.CloseCode(packet.Payload()[0])); err != nil { - panic(err) - } - }() + go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) - err = rg0.Close() + err := rg1.Close() require.NoError(t, err) - require.True(t, rg0.isClosed()) - require.True(t, rg1.isRemoteClosed()) + require.True(t, rg1.isClosed()) + require.True(t, rg2.isRemoteClosed()) // rg1 should be done (not getting any new data, returning `io.EOF` on further reads) // but not closed - require.False(t, rg1.isClosed()) + require.False(t, rg2.isClosed()) - err = rg0.Close() + err = rg1.Close() require.Equal(t, io.ErrClosedPipe, err) - err = rg1.Close() + err = rg2.Close() require.NoError(t, err) - require.True(t, rg1.isClosed()) + require.True(t, rg2.isClosed()) - err = rg1.Close() + err = rg2.Close() require.Equal(t, io.ErrClosedPipe, err) + + cancel() + teardown() } func TestRouteGroup_Read(t *testing.T) { + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) + + ctx, cancel := context.WithCancel(context.Background()) + + // push close packet from transport to route group + go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) + msg1 := []byte("hello1") msg2 := []byte("hello2") msg3 := []byte("hello3") @@ -146,12 +76,6 @@ func TestRouteGroup_Read(t *testing.T) { buf3 := make([]byte, len(msg2)/2) buf4 := make([]byte, len(msg2)/2) - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - rg2 := createRouteGroup(DefaultRouteGroupConfig()) - - _, _, teardown := createTransports(t, rg1, rg2, stcp.Type) - defer teardown() - rg1.readCh <- msg1 rg2.readCh <- msg2 rg2.readCh <- msg3 @@ -183,28 +107,18 @@ func TestRouteGroup_Read(t *testing.T) { require.NoError(t, rg1.Close()) require.NoError(t, rg2.Close()) + cancel() + teardown() } func TestRouteGroup_Write(t *testing.T) { - msg1 := []byte("hello1") - - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - require.NotNil(t, rg1) - - _, err := rg1.Write(msg1) - require.Equal(t, ErrNoTransports, err) - require.NoError(t, rg1.Close()) - - rg1 = createRouteGroup(DefaultRouteGroupConfig()) - rg2 := createRouteGroup(DefaultRouteGroupConfig()) - - m1, m2, teardown := createTransports(t, rg1, rg2, stcp.Type) - defer teardown() + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) testWrite(t, rg1, rg2, m1, m2) require.NoError(t, rg1.Close()) require.NoError(t, rg2.Close()) + teardown() } func testWrite(t *testing.T, rg1, rg2 *RouteGroup, m1, m2 *transport.Manager) { @@ -264,24 +178,20 @@ func TestRouteGroup_ReadWrite(t *testing.T) { } func testReadWrite(t *testing.T, iterations int) { - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - rg2 := createRouteGroup(DefaultRouteGroupConfig()) - m1, m2, teardownEnv := createTransports(t, rg1, rg2, stcp.Type) + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) ctx, cancel := context.WithCancel(context.Background()) - go pushPackets(ctx, m1, rg1) - + // push close packet from transport to route group go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) testRouteGroupReadWrite(t, iterations, rg1, rg2) - cancel() - assert.NoError(t, rg1.Close()) assert.NoError(t, rg2.Close()) - - teardownEnv() + cancel() + teardown() } func testRouteGroupReadWrite(t *testing.T, iterations int, rg1, rg2 io.ReadWriter) { @@ -468,60 +378,17 @@ func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) { } func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) { - keys := snettest.GenKeyPairs(2) - - pk1 := keys[0].PK - pk2 := keys[1].PK - - // create test env - nEnv := snettest.NewEnv(t, keys, []string{stcp.Type}) - - tpDisc := transport.NewDiscoveryMock() - tpKeys := snettest.GenKeyPairs(2) - - m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, tpKeys, nEnv, stcp.Type) - require.NoError(t, err) - require.NotNil(t, tp1) - require.NotNil(t, tp2) - require.NotNil(t, tp1.Entry) - require.NotNil(t, tp2.Entry) - - rg0 := createRouteGroup(DefaultRouteGroupConfig()) - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - - r0RtIDs, err := rg0.rt.ReserveKeys(1) - require.NoError(t, err) - - r1RtIDs, err := rg1.rt.ReserveKeys(1) - require.NoError(t, err) - - r0FwdRule := routing.ForwardRule(ruleKeepAlive, r0RtIDs[0], r1RtIDs[0], tp1.Entry.ID, pk2, pk1, 0, 0) - err = rg0.rt.SaveRule(r0FwdRule) - require.NoError(t, err) - - r1FwdRule := routing.ForwardRule(ruleKeepAlive, r1RtIDs[0], r0RtIDs[0], tp2.Entry.ID, pk1, pk2, 0, 0) - err = rg1.rt.SaveRule(r1FwdRule) - require.NoError(t, err) - - r0FwdRtDesc := r0FwdRule.RouteDescriptor() - rg0.desc = r0FwdRtDesc.Invert() - rg0.tps = append(rg0.tps, tp1) - rg0.fwd = append(rg0.fwd, r0FwdRule) - - r1FwdRtDesc := r1FwdRule.RouteDescriptor() - rg1.desc = r1FwdRtDesc.Invert() - rg1.tps = append(rg1.tps, tp2) - rg1.fwd = append(rg1.fwd, r1FwdRule) + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) ctx, cancel := context.WithCancel(context.Background()) // push close packet from transport to route group - go pushPackets(ctx, m2, rg1) - go pushPackets(ctx, m1, rg0) + go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) defer func() { cancel() - nEnv.Teardown() + teardown() }() chunkSize := 1024 @@ -529,33 +396,35 @@ func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) { msg := []byte(strings.Repeat("A", size)) for offset := 0; offset < size; offset += chunkSize { - _, err := rg0.Write(msg[offset : offset+chunkSize]) + _, err := rg1.Write(msg[offset : offset+chunkSize]) require.NoError(t, err) } for offset := 0; offset < size; offset += chunkSize { buf := make([]byte, chunkSize) - n, err := rg1.Read(buf) + n, err := rg2.Read(buf) require.NoError(t, err) require.Equal(t, chunkSize, n) require.Equal(t, msg[offset:offset+chunkSize], buf) } - errCh := make(chan error) - nCh := make(chan int) - bufCh := make(chan []byte) + var ( + errCh = make(chan error) + nCh = make(chan int) + bufCh = make(chan []byte) + ) go func() { buf := make([]byte, size) - n, err := rg1.Read(buf) + n, err := rg2.Read(buf) errCh <- err nCh <- n bufCh <- buf }() // close remote to simulate `io.EOF` on local connection - require.NoError(t, rg0.Close()) + require.NoError(t, rg1.Close()) - err = <-errCh + err := <-errCh n := <-nCh readBuf := <-bufCh close(nCh) @@ -565,90 +434,49 @@ func testArbitrarySizeMultipleMessagesByChunks(t *testing.T, size int) { require.Equal(t, 0, n) require.Equal(t, make([]byte, size), readBuf) - require.NoError(t, rg1.Close()) + require.NoError(t, rg2.Close()) } func testArbitrarySizeOneMessage(t *testing.T, size int) { - keys := snettest.GenKeyPairs(2) - - pk1 := keys[0].PK - pk2 := keys[1].PK - - // create test env - nEnv := snettest.NewEnv(t, keys, []string{stcp.Type}) - - tpDisc := transport.NewDiscoveryMock() - tpKeys := snettest.GenKeyPairs(2) - - m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, tpKeys, nEnv, stcp.Type) - require.NoError(t, err) - require.NotNil(t, tp1) - require.NotNil(t, tp2) - require.NotNil(t, tp1.Entry) - require.NotNil(t, tp2.Entry) - - rg0 := createRouteGroup(DefaultRouteGroupConfig()) - rg1 := createRouteGroup(DefaultRouteGroupConfig()) - - r0RtIDs, err := rg0.rt.ReserveKeys(1) - require.NoError(t, err) - - r1RtIDs, err := rg1.rt.ReserveKeys(1) - require.NoError(t, err) - - r0FwdRule := routing.ForwardRule(ruleKeepAlive, r0RtIDs[0], r1RtIDs[0], tp1.Entry.ID, pk2, pk1, 0, 0) - err = rg0.rt.SaveRule(r0FwdRule) - require.NoError(t, err) - - r1FwdRule := routing.ForwardRule(ruleKeepAlive, r1RtIDs[0], r0RtIDs[0], tp2.Entry.ID, pk1, pk2, 0, 0) - err = rg1.rt.SaveRule(r1FwdRule) - require.NoError(t, err) - - r0FwdRtDesc := r0FwdRule.RouteDescriptor() - rg0.desc = r0FwdRtDesc.Invert() - rg0.tps = append(rg0.tps, tp1) - rg0.fwd = append(rg0.fwd, r0FwdRule) - - r1FwdRtDesc := r1FwdRule.RouteDescriptor() - rg1.desc = r1FwdRtDesc.Invert() - rg1.tps = append(rg1.tps, tp2) - rg1.fwd = append(rg1.fwd, r1FwdRule) + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) ctx, cancel := context.WithCancel(context.Background()) // push close packet from transport to route group - go pushPackets(ctx, m2, rg1) - go pushPackets(ctx, m1, rg0) + go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) defer func() { cancel() - nEnv.Teardown() + teardown() }() msg := []byte(strings.Repeat("A", size)) - _, err = rg0.Write(msg) + _, err := rg1.Write(msg) require.NoError(t, err) buf := make([]byte, size) - n, err := rg1.Read(buf) + n, err := rg2.Read(buf) require.NoError(t, err) require.Equal(t, size, n) require.Equal(t, msg, buf) - errCh := make(chan error) - nCh := make(chan int) - bufCh := make(chan []byte) + var ( + errCh = make(chan error) + nCh = make(chan int) + bufCh = make(chan []byte) + ) go func() { buf := make([]byte, size) - n, err := rg1.Read(buf) + n, err := rg2.Read(buf) errCh <- err nCh <- n bufCh <- buf }() // close remote to simulate `io.EOF` on local connection - require.NoError(t, rg0.Close()) + require.NoError(t, rg1.Close()) err = <-errCh n = <-nCh @@ -660,7 +488,7 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) { require.Equal(t, 0, n) require.Equal(t, make([]byte, size), readBuf) - require.NoError(t, rg1.Close()) + require.NoError(t, rg2.Close()) } func TestRouteGroup_LocalAddr(t *testing.T) { @@ -677,75 +505,24 @@ func TestRouteGroup_RemoteAddr(t *testing.T) { require.NoError(t, rg.Close()) } -// TODO (Darkren): uncomment and fix func TestRouteGroup_TestConn(t *testing.T) { mp := func() (c1, c2 net.Conn, stop func(), err error) { - keys := snettest.GenKeyPairs(2) - - pk1 := keys[0].PK - pk2 := keys[1].PK - - // create test env - nEnv := snettest.NewEnv(t, keys, []string{stcp.Type}) - - tpDisc := transport.NewDiscoveryMock() - tpKeys := snettest.GenKeyPairs(2) - - m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, tpKeys, nEnv, stcp.Type) - require.NoError(t, err) - require.NotNil(t, tp1) - require.NotNil(t, tp2) - require.NotNil(t, tp1.Entry) - require.NotNil(t, tp2.Entry) - - // because some subtests of `TestConn` are highly specific in their behavior, - // it's best to exceed the `readCh` size - rgCfg := &RouteGroupConfig{ - ReadChBufSize: defaultReadChBufSize * 3, - KeepAliveInterval: defaultRouteGroupKeepAliveInterval, - } - - rg0 := createRouteGroup(rgCfg) - rg1 := createRouteGroup(rgCfg) - - r0RtIDs, err := rg0.rt.ReserveKeys(1) - require.NoError(t, err) - - r1RtIDs, err := rg1.rt.ReserveKeys(1) - require.NoError(t, err) - - r0FwdRule := routing.ForwardRule(ruleKeepAlive, r0RtIDs[0], r1RtIDs[0], tp1.Entry.ID, pk2, pk1, 0, 0) - err = rg0.rt.SaveRule(r0FwdRule) - require.NoError(t, err) - - r1FwdRule := routing.ForwardRule(ruleKeepAlive, r1RtIDs[0], r0RtIDs[0], tp2.Entry.ID, pk1, pk2, 0, 0) - err = rg1.rt.SaveRule(r1FwdRule) - require.NoError(t, err) - - r0FwdRtDesc := r0FwdRule.RouteDescriptor() - rg0.desc = r0FwdRtDesc.Invert() - rg0.tps = append(rg0.tps, tp1) - rg0.fwd = append(rg0.fwd, r0FwdRule) - - r1FwdRtDesc := r1FwdRule.RouteDescriptor() - rg1.desc = r1FwdRtDesc.Invert() - rg1.tps = append(rg1.tps, tp2) - rg1.fwd = append(rg1.fwd, r1FwdRule) + rg1, rg2, m1, m2, _, _, teardown := setupEnv(t) ctx, cancel := context.WithCancel(context.Background()) // push close packet from transport to route group - go pushPackets(ctx, m2, rg1) - go pushPackets(ctx, m1, rg0) + go pushPackets(ctx, m2, rg2) + go pushPackets(ctx, m1, rg1) stop = func() { - _ = rg0.Close() // nolint:errcheck _ = rg1.Close() // nolint:errcheck + _ = rg2.Close() // nolint:errcheck cancel() - nEnv.Teardown() + teardown() } - return rg0, rg1, stop, nil + return rg1, rg2, stop, nil } nettest.TestConn(t, mp) @@ -771,7 +548,6 @@ func pushPackets(ctx context.Context, from *transport.Manager, to *RouteGroup) { switch packet.Type() { case routing.ClosePacket: - fmt.Printf("GOT CLOSE PACKET ON %s\n", to.LocalAddr().String()) if to.isClosed() { panic(io.ErrClosedPipe) } @@ -782,7 +558,6 @@ func pushPackets(ctx context.Context, from *transport.Manager, to *RouteGroup) { return case routing.DataPacket: - //fmt.Println("GOT DATA PACKET") if !safeSend(ctx, to, payload) { return } @@ -827,40 +602,63 @@ func createRouteGroup(cfg *RouteGroupConfig) *RouteGroup { return rg } -// nolint:unparam -func createTransports(t *testing.T, rg1, rg2 *RouteGroup, network string) (m1, m2 *transport.Manager, teardown func()) { - tpDisc := transport.NewDiscoveryMock() +func setupEnv(t *testing.T) (rg1, rg2 *RouteGroup, m1, m2 *transport.Manager, + tp1, tp2 *transport.ManagedTransport, teardown func()) { keys := snettest.GenKeyPairs(2) - nEnv := snettest.NewEnv(t, keys, []string{network}) + pk1 := keys[0].PK + pk2 := keys[1].PK + + // create test env + nEnv := snettest.NewEnv(t, keys, []string{stcp.Type}) + + tpDisc := transport.NewDiscoveryMock() + tpKeys := snettest.GenKeyPairs(2) - m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, keys, nEnv, network) + m1, m2, tp1, tp2, err := transport.CreateTransportPair(tpDisc, tpKeys, nEnv, stcp.Type) require.NoError(t, err) require.NotNil(t, tp1) require.NotNil(t, tp2) require.NotNil(t, tp1.Entry) require.NotNil(t, tp2.Entry) - keepAlive := 1 * time.Hour - // TODO: remove rand - id1 := routing.RouteID(rand.Int()) // nolint: gosec - id2 := routing.RouteID(rand.Int()) // nolint: gosec - port1 := routing.Port(1) - port2 := routing.Port(2) - rule1 := routing.ForwardRule(keepAlive, id1, id2, tp2.Entry.ID, keys[0].PK, keys[1].PK, port1, port2) - rule2 := routing.ForwardRule(keepAlive, id2, id1, tp1.Entry.ID, keys[1].PK, keys[0].PK, port2, port1) + // because some subtests of `TestConn` are highly specific in their behavior, + // it's best to exceed the `readCh` size + rgCfg := &RouteGroupConfig{ + ReadChBufSize: defaultReadChBufSize * 3, + KeepAliveInterval: defaultRouteGroupKeepAliveInterval, + } + + rg1 = createRouteGroup(rgCfg) + rg2 = createRouteGroup(rgCfg) - rg1.mu.Lock() + r1RtIDs, err := rg1.rt.ReserveKeys(1) + require.NoError(t, err) + + r2RtIDs, err := rg2.rt.ReserveKeys(1) + require.NoError(t, err) + + r1FwdRule := routing.ForwardRule(ruleKeepAlive, r1RtIDs[0], r2RtIDs[0], tp1.Entry.ID, pk2, pk1, 0, 0) + err = rg1.rt.SaveRule(r1FwdRule) + require.NoError(t, err) + + r2FwdRule := routing.ForwardRule(ruleKeepAlive, r2RtIDs[0], r1RtIDs[0], tp2.Entry.ID, pk1, pk2, 0, 0) + err = rg2.rt.SaveRule(r2FwdRule) + require.NoError(t, err) + + r1FwdRtDesc := r1FwdRule.RouteDescriptor() + rg1.desc = r1FwdRtDesc.Invert() rg1.tps = append(rg1.tps, tp1) - rg1.fwd = append(rg1.fwd, rule1) - rg1.mu.Unlock() + rg1.fwd = append(rg1.fwd, r1FwdRule) - rg2.mu.Lock() + r2FwdRtDesc := r2FwdRule.RouteDescriptor() + rg2.desc = r2FwdRtDesc.Invert() rg2.tps = append(rg2.tps, tp2) - rg2.fwd = append(rg2.fwd, rule2) - rg2.mu.Unlock() + rg2.fwd = append(rg2.fwd, r2FwdRule) - return m1, m2, func() { + teardown = func() { nEnv.Teardown() } + + return rg1, rg2, m1, m2, tp1, tp2, teardown }