From da5ff4145ed0507795909dec966dcc07691469d6 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Wed, 29 Jan 2020 15:10:18 +0300 Subject: [PATCH] Fix `TestArbitrarySizeOneMessage` --- pkg/router/route_group.go | 6 +++-- pkg/router/route_group_test.go | 40 +++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/pkg/router/route_group.go b/pkg/router/route_group.go index 02be1b1527..5f2e530e7c 100644 --- a/pkg/router/route_group.go +++ b/pkg/router/route_group.go @@ -416,11 +416,11 @@ func (rg *RouteGroup) close(code routing.CloseCode) error { } func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { - rg.logger.Infof("Got close packet with code %d", code) + rg.logger.Infof("Got close packet with code %d on %s", code, rg.LocalAddr().String()) if rg.isCloseInitiator() { // this route group initiated close loop and got response - rg.logger.Debugf("Handling response close packet with code %d", code) + rg.logger.Debugf("Handling response close packet with code %d on %s", code, rg.LocalAddr().String()) rg.closeDone.Done() return nil @@ -431,6 +431,8 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error { } func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error { + rg.logger.Infof("Broadcasting Close packets to %d addresses", len(rg.tps)) + for i := 0; i < len(rg.tps); i++ { packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code) if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil { diff --git a/pkg/router/route_group_test.go b/pkg/router/route_group_test.go index 1e0f012ee5..82c4aa0907 100644 --- a/pkg/router/route_group_test.go +++ b/pkg/router/route_group_test.go @@ -438,9 +438,9 @@ func TestArbitrarySizeOneMessage(t *testing.T) { wg.Wait() - /*t.Run("Value2", func(t *testing.T) { + t.Run("Value2", func(t *testing.T) { testArbitrarySizeOneMessage(t, value2) - })*/ + }) } func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) { @@ -526,7 +526,9 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) { require.NotNil(t, tp2.Entry) rg0 := createRouteGroup() + fmt.Printf("RG0 Addr: %s\n", rg0.LocalAddr().String()) rg1 := createRouteGroup() + fmt.Printf("RG Addr: %s\n", rg1.LocalAddr().String()) r0RtIDs, err := rg0.rt.ReserveKeys(1) require.NoError(t, err) @@ -558,20 +560,11 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) { go pushPackets(ctx, m2, rg1) go pushPackets(ctx, m1, rg0) - /*rg1 := createRouteGroup() - rg2 := createRouteGroup() - m1, m2, teardownEnv := createTransports(t, rg1, rg2, stcp.Type)*/ - - //ctx, cancel := context.WithCancel(context.Background()) - defer func() { cancel() nEnv.Teardown() }() - go pushPackets(ctx, m1, rg0) - go pushPackets(ctx, m2, rg1) - msg := []byte(strings.Repeat("A", size)) _, err = rg0.Write(msg) @@ -583,13 +576,30 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) { require.Equal(t, size, n) require.Equal(t, msg, buf) - buf = make([]byte, size) - n, err = rg1.Read(buf) + errCh := make(chan error) + nCh := make(chan int) + bufCh := make(chan []byte) + go func() { + buf := make([]byte, size) + n, err := rg1.Read(buf) + errCh <- err + nCh <- n + bufCh <- buf + }() + + // close remote to simulate `io.EOF` on local connection + require.NoError(t, rg0.Close()) + + err = <-errCh + n = <-nCh + readBuf := <-bufCh + close(nCh) + close(errCh) + close(bufCh) require.Equal(t, io.EOF, err) require.Equal(t, 0, n) - require.Equal(t, make([]byte, size), buf) + require.Equal(t, make([]byte, size), readBuf) - require.NoError(t, rg0.Close()) require.NoError(t, rg1.Close()) }