Skip to content

Commit

Permalink
Fix TestArbitrarySizeOneMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Jan 29, 2020
1 parent de604ea commit da5ff41
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 17 deletions.
6 changes: 4 additions & 2 deletions pkg/router/route_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ func (rg *RouteGroup) close(code routing.CloseCode) error {
}

func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
rg.logger.Infof("Got close packet with code %d", code)
rg.logger.Infof("Got close packet with code %d on %s", code, rg.LocalAddr().String())

if rg.isCloseInitiator() {
// this route group initiated close loop and got response
rg.logger.Debugf("Handling response close packet with code %d", code)
rg.logger.Debugf("Handling response close packet with code %d on %s", code, rg.LocalAddr().String())

rg.closeDone.Done()
return nil
Expand All @@ -431,6 +431,8 @@ func (rg *RouteGroup) handleClosePacket(code routing.CloseCode) error {
}

func (rg *RouteGroup) broadcastClosePackets(code routing.CloseCode) error {
rg.logger.Infof("Broadcasting Close packets to %d addresses", len(rg.tps))

for i := 0; i < len(rg.tps); i++ {
packet := routing.MakeClosePacket(rg.fwd[i].KeyRouteID(), code)
if err := rg.tps[i].WritePacket(context.Background(), packet); err != nil {
Expand Down
40 changes: 25 additions & 15 deletions pkg/router/route_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func TestArbitrarySizeOneMessage(t *testing.T) {

wg.Wait()

/*t.Run("Value2", func(t *testing.T) {
t.Run("Value2", func(t *testing.T) {
testArbitrarySizeOneMessage(t, value2)
})*/
})
}

func TestArbitrarySizeMultipleMessagesByChunks(t *testing.T) {
Expand Down Expand Up @@ -526,7 +526,9 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) {
require.NotNil(t, tp2.Entry)

rg0 := createRouteGroup()
fmt.Printf("RG0 Addr: %s\n", rg0.LocalAddr().String())
rg1 := createRouteGroup()
fmt.Printf("RG Addr: %s\n", rg1.LocalAddr().String())

r0RtIDs, err := rg0.rt.ReserveKeys(1)
require.NoError(t, err)
Expand Down Expand Up @@ -558,20 +560,11 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) {
go pushPackets(ctx, m2, rg1)
go pushPackets(ctx, m1, rg0)

/*rg1 := createRouteGroup()
rg2 := createRouteGroup()
m1, m2, teardownEnv := createTransports(t, rg1, rg2, stcp.Type)*/

//ctx, cancel := context.WithCancel(context.Background())

defer func() {
cancel()
nEnv.Teardown()
}()

go pushPackets(ctx, m1, rg0)
go pushPackets(ctx, m2, rg1)

msg := []byte(strings.Repeat("A", size))

_, err = rg0.Write(msg)
Expand All @@ -583,13 +576,30 @@ func testArbitrarySizeOneMessage(t *testing.T, size int) {
require.Equal(t, size, n)
require.Equal(t, msg, buf)

buf = make([]byte, size)
n, err = rg1.Read(buf)
errCh := make(chan error)
nCh := make(chan int)
bufCh := make(chan []byte)
go func() {
buf := make([]byte, size)
n, err := rg1.Read(buf)
errCh <- err
nCh <- n
bufCh <- buf
}()

// close remote to simulate `io.EOF` on local connection
require.NoError(t, rg0.Close())

err = <-errCh
n = <-nCh
readBuf := <-bufCh
close(nCh)
close(errCh)
close(bufCh)
require.Equal(t, io.EOF, err)
require.Equal(t, 0, n)
require.Equal(t, make([]byte, size), buf)
require.Equal(t, make([]byte, size), readBuf)

require.NoError(t, rg0.Close())
require.NoError(t, rg1.Close())
}

Expand Down

0 comments on commit da5ff41

Please sign in to comment.