Skip to content

Commit

Permalink
Fix closure bug in TestServer_Serve, some minor bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Jun 11, 2019
1 parent 8ed9e1b commit d60324e
Showing 1 changed file with 28 additions and 33 deletions.
61 changes: 28 additions & 33 deletions pkg/dmsg/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,24 @@ func TestServer_Serve(t *testing.T) {
})

t.Run("test transport establishment concurrently", func(t *testing.T) {
initiatorsCount := 4
remotesCount := 4
initiatorsCount := 50
remotesCount := 50

rand := rand.New(rand.NewSource(time.Now().UnixNano()))

// store the number of transports each remote should handle
usedRemotes := make(map[int]int)
// mapping initiators to remotes
pickedRemotes := make([]int, 0, initiatorsCount)
for i := 0; i < initiatorsCount; i++ {
remote := rand.Intn(remotesCount)
if _, ok := usedRemotes[remote]; !ok {
usedRemotes[remote] = 0
}

usedRemotes[remote] = usedRemotes[remote] + 1
pickedRemotes = append(pickedRemotes, remote)
}

initiators := make([]*Client, 0, initiatorsCount)
remotes := make([]*Client, 0, remotesCount)
Expand All @@ -312,26 +328,11 @@ func TestServer_Serve(t *testing.T) {

c := NewClient(pk, sk, dc)
c.SetLogger(logging.MustGetLogger(fmt.Sprintf("Remote %d", i)))
err := c.InitiateServerConnections(context.Background(), 1)
require.NoError(t, err)

remotes = append(remotes, c)
}

rand := rand.New(rand.NewSource(time.Now().UnixNano()))

// store the number of transports each remote should handle
usedRemotes := make(map[int]int)
// mapping initiators to remotes
pickedRemotes := make([]int, 0, initiatorsCount)
for range initiators {
remote := rand.Intn(remotesCount)
if _, ok := usedRemotes[remote]; !ok {
usedRemotes[remote] = 0
if _, ok := usedRemotes[i]; ok {
err := c.InitiateServerConnections(context.Background(), 1)
require.NoError(t, err)
}

usedRemotes[remote] = usedRemotes[remote] + 1
pickedRemotes = append(pickedRemotes, remote)
remotes = append(remotes, c)
}

totalRemoteTpsCount := 0
Expand All @@ -343,27 +344,25 @@ func TestServer_Serve(t *testing.T) {
remotesTps := make(map[int][]transport.Transport, len(usedRemotes))
var remotesWG sync.WaitGroup
remotesWG.Add(totalRemoteTpsCount)
for i, r := range remotes {
for i := range remotes {
if _, ok := usedRemotes[i]; ok {
for connect := 0; connect < usedRemotes[i]; connect++ {
// run remotes
go func(remoteInd, conn int) {
go func(remoteInd int) {
var (
transport transport.Transport
err error
)

transport, err = r.Accept(context.Background())
transport, err = remotes[remoteInd].Accept(context.Background())
if err != nil {
acceptErrs <- err
}

remotesTps[remoteInd] = append(remotesTps[remoteInd], transport)

log.Printf("Remote %v with conn %v done", remoteInd, conn)

remotesWG.Done()
}(i, connect)
}(i)
}
}
}
Expand All @@ -380,16 +379,14 @@ func TestServer_Serve(t *testing.T) {
err error
)

transport, err = initiators[initiatorInd].Dial(context.Background(),
remotes[pickedRemotes[initiatorInd]].pk)
remote := remotes[pickedRemotes[initiatorInd]]
transport, err = initiators[initiatorInd].Dial(context.Background(), remote.pk)
if err != nil {
dialErrs <- err
}

initiatorsTps = append(initiatorsTps, transport)

log.Printf("Initiator %v done", initiatorInd)

initiatorsWG.Done()
}(i)
}
Expand All @@ -401,8 +398,6 @@ func TestServer_Serve(t *testing.T) {
// single error should fail test
require.NoError(t, err)

log.Printf("Initiators all done")

// wait for remotes
remotesWG.Wait()
close(acceptErrs)
Expand Down

0 comments on commit d60324e

Please sign in to comment.