From 379a82c0830b61c3a2c3c6219a3c20bf84588242 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Mon, 17 Jun 2019 19:01:56 +0300 Subject: [PATCH 01/11] Add `Dial` loop for the failed transport --- pkg/dmsg/server_test.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index e0886c99dc..5b0bd8825b 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -586,7 +586,7 @@ func TestServer_Serve(t *testing.T) { tpReadWriteWG.Done() return default: - msg := make([]byte, 13) + var msg []byte if _, aErr = aTransport.Read(msg); aErr != nil { tpReadWriteWG.Done() return @@ -614,12 +614,17 @@ func TestServer_Serve(t *testing.T) { } }() - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() + // continue creating transports untill the error occurs + for { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - // try to create another transport - _, err = a.Dial(ctx, bPK) - // must fail with timeout + _, err = a.Dial(ctx, bPK) + cancel() + if err != nil { + break + } + } + // must be error require.Error(t, err) // wait more time to ensure that the initially created transport works From 0d37f4fc65978bee57e859923729dd7dba9dd1e5 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Mon, 17 Jun 2019 20:26:28 +0300 Subject: [PATCH 02/11] Add profiling for failed transport test --- pkg/dmsg/server_test.go | 37 ++++++++++++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 5b0bd8825b..5e461d4990 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -8,6 +8,9 @@ import ( "math" "math/rand" "net" + "os" + "runtime" + "runtime/pprof" "sync" "testing" "time" @@ -541,6 +544,30 @@ func TestServer_Serve(t *testing.T) { }) t.Run("test failed accept not hanging already established transport", func(t *testing.T) { + f, err := os.Create("./cpu.prof") + if err != nil { + log.Fatalf("Error creating cpu profile: %v\n", err) + } + + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatalf("Error starting cpu profiling: %v\n", err) + } + + defer pprof.StopCPUProfile() + + blockF, err := os.Create("./block.prof") + if err != nil { + log.Fatalf("Error creating block profile: %v\n", err) + } + + runtime.SetBlockProfileRate(1) + p := pprof.Lookup("block") + defer func() { + if err := p.WriteTo(blockF, 0); err != nil { + log.Fatalf("Error saving block profile: %v\n", err) + } + }() + // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() bPK, bSK := cipher.GenerateKeyPair() @@ -548,7 +575,7 @@ func TestServer_Serve(t *testing.T) { // create remote a := NewClient(aPK, aSK, dc) a.SetLogger(logging.MustGetLogger("A")) - err := a.InitiateServerConnections(context.Background(), 1) + err = a.InitiateServerConnections(context.Background(), 1) require.NoError(t, err) // create initiator @@ -586,12 +613,13 @@ func TestServer_Serve(t *testing.T) { tpReadWriteWG.Done() return default: - var msg []byte + //var msg []byte + msg := make([]byte, 13) if _, aErr = aTransport.Read(msg); aErr != nil { tpReadWriteWG.Done() return } - log.Printf("GOT MESSAGE %s", string(msg)) + //log.Printf("GOT MESSAGE %s", string(msg)) } } }() @@ -606,10 +634,13 @@ func TestServer_Serve(t *testing.T) { return default: msg := []byte("Hello there!") + log.Println("BEFORE TRANSPORT WRITE") if _, bErr = bTransport.Write(msg); bErr != nil { + log.Println("ERROR IN TRANSPORT WRITE") tpReadWriteWG.Done() return } + log.Println("AFTER TRANSPORT WRITE") } } }() From b14212e39e1de022844c6564a24294912a695a85 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 11:21:21 +0300 Subject: [PATCH 03/11] Add demonstration test --- pkg/dmsg/server_test.go | 147 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 142 insertions(+), 5 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 5e461d4990..d3860bc733 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -8,9 +8,6 @@ import ( "math" "math/rand" "net" - "os" - "runtime" - "runtime/pprof" "sync" "testing" "time" @@ -544,7 +541,7 @@ func TestServer_Serve(t *testing.T) { }) t.Run("test failed accept not hanging already established transport", func(t *testing.T) { - f, err := os.Create("./cpu.prof") + /*f, err := os.Create("./cpu.prof") if err != nil { log.Fatalf("Error creating cpu profile: %v\n", err) } @@ -566,7 +563,7 @@ func TestServer_Serve(t *testing.T) { if err := p.WriteTo(blockF, 0); err != nil { log.Fatalf("Error saving block profile: %v\n", err) } - }() + }()*/ // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() @@ -683,6 +680,146 @@ func TestServer_Serve(t *testing.T) { err = b.Close() require.NoError(t, err) }) + + t.Run("test failed accept not hanging already established transport 2", func(t *testing.T) { + /*f, err := os.Create("./cpu.prof") + if err != nil { + log.Fatalf("Error creating cpu profile: %v\n", err) + } + + if err := pprof.StartCPUProfile(f); err != nil { + log.Fatalf("Error starting cpu profiling: %v\n", err) + } + + defer pprof.StopCPUProfile() + + blockF, err := os.Create("./block.prof") + if err != nil { + log.Fatalf("Error creating block profile: %v\n", err) + } + + runtime.SetBlockProfileRate(1) + p := pprof.Lookup("block") + defer func() { + if err := p.WriteTo(blockF, 0); err != nil { + log.Fatalf("Error saving block profile: %v\n", err) + } + }()*/ + + // generate keys for both clients + aPK, aSK := cipher.GenerateKeyPair() + bPK, bSK := cipher.GenerateKeyPair() + + // create remote + a := NewClient(aPK, aSK, dc) + a.SetLogger(logging.MustGetLogger("A")) + err = a.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + // create initiator + b := NewClient(bPK, bSK, dc) + b.SetLogger(logging.MustGetLogger("B")) + err = b.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + aDone := make(chan struct{}) + var aTransport transport.Transport + var aErr error + go func() { + aTransport, aErr = a.Accept(context.Background()) + close(aDone) + }() + + bTransport, err := b.Dial(context.Background(), aPK) + require.NoError(t, err) + + <-aDone + require.NoError(t, aErr) + + aTpDone := make(chan struct{}) + bTpDone := make(chan struct{}) + + var bErr error + var tpReadWriteWG sync.WaitGroup + tpReadWriteWG.Add(2) + // run infinite reading from tp loop in goroutine + go func() { + for { + select { + case <-aTpDone: + log.Println("ATransport DONE") + tpReadWriteWG.Done() + return + default: + var msg []byte + if _, aErr = aTransport.Read(msg); aErr != nil { + tpReadWriteWG.Done() + return + } + log.Printf("GOT MESSAGE %s", string(msg)) + } + } + }() + + // run infinite writing to tp loop in goroutine + go func() { + for { + select { + case <-bTpDone: + log.Println("BTransport DONE") + tpReadWriteWG.Done() + return + default: + msg := []byte("Hello there!") + log.Println("BEFORE TRANSPORT WRITE") + if _, bErr = bTransport.Write(msg); bErr != nil { + log.Println("ERROR IN TRANSPORT WRITE") + tpReadWriteWG.Done() + return + } + log.Println("AFTER TRANSPORT WRITE") + } + } + }() + + // continue creating transports untill the error occurs + for { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + + _, err = a.Dial(ctx, bPK) + cancel() + if err != nil { + break + } + } + // must be error + require.Error(t, err) + + // wait more time to ensure that the initially created transport works + time.Sleep(2 * time.Second) + + // stop reading/writing goroutines + close(aTpDone) + close(bTpDone) + + // wait for goroutines to stop + tpReadWriteWG.Wait() + // check that the initial transport had been working properly all the time + require.NoError(t, aErr) + require.NoError(t, bErr) + + err = aTransport.Close() + require.NoError(t, err) + + err = bTransport.Close() + require.NoError(t, err) + + err = a.Close() + require.NoError(t, err) + + err = b.Close() + require.NoError(t, err) + }) } // Given two client instances (a & b) and a server instance (s), From f14c2302fe7db67ac83dd0ee90b36deed7cd30b7 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 12:44:40 +0300 Subject: [PATCH 04/11] Fix hanging issue --- pkg/dmsg/server_test.go | 49 ++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 32 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index d3860bc733..814220de83 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "log" "math" "math/rand" @@ -541,30 +542,6 @@ func TestServer_Serve(t *testing.T) { }) t.Run("test failed accept not hanging already established transport", func(t *testing.T) { - /*f, err := os.Create("./cpu.prof") - if err != nil { - log.Fatalf("Error creating cpu profile: %v\n", err) - } - - if err := pprof.StartCPUProfile(f); err != nil { - log.Fatalf("Error starting cpu profiling: %v\n", err) - } - - defer pprof.StopCPUProfile() - - blockF, err := os.Create("./block.prof") - if err != nil { - log.Fatalf("Error creating block profile: %v\n", err) - } - - runtime.SetBlockProfileRate(1) - p := pprof.Lookup("block") - defer func() { - if err := p.WriteTo(blockF, 0); err != nil { - log.Fatalf("Error saving block profile: %v\n", err) - } - }()*/ - // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() bPK, bSK := cipher.GenerateKeyPair() @@ -658,6 +635,12 @@ func TestServer_Serve(t *testing.T) { // wait more time to ensure that the initially created transport works time.Sleep(2 * time.Second) + err = aTransport.Close() + require.NoError(t, err) + + err = bTransport.Close() + require.NoError(t, err) + // stop reading/writing goroutines close(aTpDone) close(bTpDone) @@ -665,19 +648,21 @@ func TestServer_Serve(t *testing.T) { // wait for goroutines to stop tpReadWriteWG.Wait() // check that the initial transport had been working properly all the time - require.NoError(t, aErr) - require.NoError(t, bErr) - - err = aTransport.Close() - require.NoError(t, err) - - err = bTransport.Close() - require.NoError(t, err) + // if any error, it must be `io.EOF` for reader + if aErr != io.EOF { + require.NoError(t, aErr) + } + // if any error, it must be `io.ErrClosedPipe` for writer + if bErr != io.ErrClosedPipe { + require.NoError(t, bErr) + } err = a.Close() require.NoError(t, err) + b.log.Println("BEFORE CLOSING") err = b.Close() + b.log.Println("AFTER CLOSING") require.NoError(t, err) }) From c68db44da3c44b63af40e3fea55ef3a438e7fe5e Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 13:48:04 +0300 Subject: [PATCH 05/11] Move `Accept` out of goroutine for the first `Serve` test --- pkg/dmsg/server_test.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 814220de83..a7a7a40f71 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -194,19 +194,11 @@ func TestServer_Serve(t *testing.T) { err = b.InitiateServerConnections(context.Background(), 1) require.NoError(t, err) - aDone := make(chan struct{}) - var aTransport transport.Transport - var aErr error - go func() { - aTransport, aErr = a.Accept(context.Background()) - close(aDone) - }() - bTransport, err := b.Dial(context.Background(), aPK) require.NoError(t, err) - <-aDone - require.NoError(t, aErr) + aTransport, err := a.Accept(context.Background()) + require.NoError(t, err) // must be 2 ServerConn's require.Equal(t, 2, s.connCount()) From 647a89115b1748d656144d5d49ed81ad570147c5 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 13:53:23 +0300 Subject: [PATCH 06/11] Fix the `concurrent writes to map` error --- pkg/dmsg/server_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index a7a7a40f71..14ccc4fe5d 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -339,6 +339,7 @@ func TestServer_Serve(t *testing.T) { // channel to listen for `Accept` errors. Any single error must // fail the test acceptErrs := make(chan error, totalRemoteTpsCount) + var remotesTpsMX sync.Mutex remotesTps := make(map[int][]transport.Transport, len(usedRemotes)) var remotesWG sync.WaitGroup remotesWG.Add(totalRemoteTpsCount) @@ -359,7 +360,9 @@ func TestServer_Serve(t *testing.T) { } // store transport + remotesTpsMX.Lock() remotesTps[remoteInd] = append(remotesTps[remoteInd], transport) + remotesTpsMX.Unlock() remotesWG.Done() }(i) @@ -370,6 +373,7 @@ func TestServer_Serve(t *testing.T) { // channel to listen for `Dial` errors. Any single error must // fail the test dialErrs := make(chan error, initiatorsCount) + var initiatorsTpsMx sync.Mutex initiatorsTps := make([]transport.Transport, 0, initiatorsCount) var initiatorsWG sync.WaitGroup initiatorsWG.Add(initiatorsCount) @@ -388,7 +392,9 @@ func TestServer_Serve(t *testing.T) { } // store transport + initiatorsTpsMx.Lock() initiatorsTps = append(initiatorsTps, transport) + initiatorsTpsMx.Unlock() initiatorsWG.Done() }(i) From 93dbc71315992c12ebf12d99bfa71aa0606d5867 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 14:17:22 +0300 Subject: [PATCH 07/11] Move read/write operations to a single goroutine for failed transport test --- pkg/dmsg/server_test.go | 211 ++++------------------------------------ 1 file changed, 19 insertions(+), 192 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 14ccc4fe5d..04a7d4e46b 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -556,68 +556,37 @@ func TestServer_Serve(t *testing.T) { err = b.InitiateServerConnections(context.Background(), 1) require.NoError(t, err) - aDone := make(chan struct{}) - var aTransport transport.Transport - var aErr error - go func() { - aTransport, aErr = a.Accept(context.Background()) - close(aDone) - }() - bTransport, err := b.Dial(context.Background(), aPK) require.NoError(t, err) - <-aDone - require.NoError(t, aErr) + aTransport, err := a.Accept(context.Background()) + require.NoError(t, err) - aTpDone := make(chan struct{}) - bTpDone := make(chan struct{}) + readWriteStop := make(chan struct{}) + readWriteDone := make(chan struct{}) - var bErr error - var tpReadWriteWG sync.WaitGroup - tpReadWriteWG.Add(2) - // run infinite reading from tp loop in goroutine + var readErr, writeErr error go func() { for { select { - case <-aTpDone: - log.Println("ATransport DONE") - tpReadWriteWG.Done() + case <-readWriteStop: + close(readWriteDone) return default: - //var msg []byte - msg := make([]byte, 13) - if _, aErr = aTransport.Read(msg); aErr != nil { - tpReadWriteWG.Done() + msg := []byte("Hello there!") + if _, writeErr = bTransport.Write(msg); writeErr != nil { + close(readWriteDone) return } - //log.Printf("GOT MESSAGE %s", string(msg)) - } - } - }() - - // run infinite writing to tp loop in goroutine - go func() { - for { - select { - case <-bTpDone: - log.Println("BTransport DONE") - tpReadWriteWG.Done() - return - default: - msg := []byte("Hello there!") - log.Println("BEFORE TRANSPORT WRITE") - if _, bErr = bTransport.Write(msg); bErr != nil { - log.Println("ERROR IN TRANSPORT WRITE") - tpReadWriteWG.Done() + if _, readErr = aTransport.Read(msg); readErr != nil { + close(readWriteDone) return } - log.Println("AFTER TRANSPORT WRITE") } } }() - // continue creating transports untill the error occurs + // continue creating transports until the error occurs for { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) @@ -640,19 +609,17 @@ func TestServer_Serve(t *testing.T) { require.NoError(t, err) // stop reading/writing goroutines - close(aTpDone) - close(bTpDone) + close(readWriteStop) + <-readWriteDone - // wait for goroutines to stop - tpReadWriteWG.Wait() // check that the initial transport had been working properly all the time // if any error, it must be `io.EOF` for reader - if aErr != io.EOF { - require.NoError(t, aErr) + if readErr != io.EOF { + require.NoError(t, readErr) } // if any error, it must be `io.ErrClosedPipe` for writer - if bErr != io.ErrClosedPipe { - require.NoError(t, bErr) + if writeErr != io.ErrClosedPipe { + require.NoError(t, writeErr) } err = a.Close() @@ -663,146 +630,6 @@ func TestServer_Serve(t *testing.T) { b.log.Println("AFTER CLOSING") require.NoError(t, err) }) - - t.Run("test failed accept not hanging already established transport 2", func(t *testing.T) { - /*f, err := os.Create("./cpu.prof") - if err != nil { - log.Fatalf("Error creating cpu profile: %v\n", err) - } - - if err := pprof.StartCPUProfile(f); err != nil { - log.Fatalf("Error starting cpu profiling: %v\n", err) - } - - defer pprof.StopCPUProfile() - - blockF, err := os.Create("./block.prof") - if err != nil { - log.Fatalf("Error creating block profile: %v\n", err) - } - - runtime.SetBlockProfileRate(1) - p := pprof.Lookup("block") - defer func() { - if err := p.WriteTo(blockF, 0); err != nil { - log.Fatalf("Error saving block profile: %v\n", err) - } - }()*/ - - // generate keys for both clients - aPK, aSK := cipher.GenerateKeyPair() - bPK, bSK := cipher.GenerateKeyPair() - - // create remote - a := NewClient(aPK, aSK, dc) - a.SetLogger(logging.MustGetLogger("A")) - err = a.InitiateServerConnections(context.Background(), 1) - require.NoError(t, err) - - // create initiator - b := NewClient(bPK, bSK, dc) - b.SetLogger(logging.MustGetLogger("B")) - err = b.InitiateServerConnections(context.Background(), 1) - require.NoError(t, err) - - aDone := make(chan struct{}) - var aTransport transport.Transport - var aErr error - go func() { - aTransport, aErr = a.Accept(context.Background()) - close(aDone) - }() - - bTransport, err := b.Dial(context.Background(), aPK) - require.NoError(t, err) - - <-aDone - require.NoError(t, aErr) - - aTpDone := make(chan struct{}) - bTpDone := make(chan struct{}) - - var bErr error - var tpReadWriteWG sync.WaitGroup - tpReadWriteWG.Add(2) - // run infinite reading from tp loop in goroutine - go func() { - for { - select { - case <-aTpDone: - log.Println("ATransport DONE") - tpReadWriteWG.Done() - return - default: - var msg []byte - if _, aErr = aTransport.Read(msg); aErr != nil { - tpReadWriteWG.Done() - return - } - log.Printf("GOT MESSAGE %s", string(msg)) - } - } - }() - - // run infinite writing to tp loop in goroutine - go func() { - for { - select { - case <-bTpDone: - log.Println("BTransport DONE") - tpReadWriteWG.Done() - return - default: - msg := []byte("Hello there!") - log.Println("BEFORE TRANSPORT WRITE") - if _, bErr = bTransport.Write(msg); bErr != nil { - log.Println("ERROR IN TRANSPORT WRITE") - tpReadWriteWG.Done() - return - } - log.Println("AFTER TRANSPORT WRITE") - } - } - }() - - // continue creating transports untill the error occurs - for { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - - _, err = a.Dial(ctx, bPK) - cancel() - if err != nil { - break - } - } - // must be error - require.Error(t, err) - - // wait more time to ensure that the initially created transport works - time.Sleep(2 * time.Second) - - // stop reading/writing goroutines - close(aTpDone) - close(bTpDone) - - // wait for goroutines to stop - tpReadWriteWG.Wait() - // check that the initial transport had been working properly all the time - require.NoError(t, aErr) - require.NoError(t, bErr) - - err = aTransport.Close() - require.NoError(t, err) - - err = bTransport.Close() - require.NoError(t, err) - - err = a.Close() - require.NoError(t, err) - - err = b.Close() - require.NoError(t, err) - }) } // Given two client instances (a & b) and a server instance (s), From 0228f68725f2a5fd83ca1dae306afb98db9652e5 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 14:20:47 +0300 Subject: [PATCH 08/11] Change failed transport test's name --- pkg/dmsg/server_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 04a7d4e46b..58ae86d59a 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -539,7 +539,7 @@ func TestServer_Serve(t *testing.T) { } }) - t.Run("test failed accept not hanging already established transport", func(t *testing.T) { + t.Run("test failed_accept_should_not_hang_established_transport", func(t *testing.T) { // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() bPK, bSK := cipher.GenerateKeyPair() @@ -588,11 +588,9 @@ func TestServer_Serve(t *testing.T) { // continue creating transports until the error occurs for { - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - - _, err = a.Dial(ctx, bPK) - cancel() - if err != nil { + ctx := context.Background() + //ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + if _, err = a.Dial(ctx, bPK); err != nil { break } } From 504cc375a91eba6f8680c40d692f4fab051a5cba Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 15:59:10 +0300 Subject: [PATCH 09/11] Add test for sent/received msg consistency --- pkg/dmsg/server_test.go | 71 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 58ae86d59a..f64a122b48 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -539,7 +539,7 @@ func TestServer_Serve(t *testing.T) { } }) - t.Run("test failed_accept_should_not_hang_established_transport", func(t *testing.T) { + t.Run("test failed accept not hanging already established transport", func(t *testing.T) { // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() bPK, bSK := cipher.GenerateKeyPair() @@ -590,7 +590,7 @@ func TestServer_Serve(t *testing.T) { for { ctx := context.Background() //ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) - if _, err = a.Dial(ctx, bPK); err != nil { + if _, err := a.Dial(ctx, bPK); err != nil { break } } @@ -628,6 +628,73 @@ func TestServer_Serve(t *testing.T) { b.log.Println("AFTER CLOSING") require.NoError(t, err) }) + + t.Run("test sent/received message consistency", func(t *testing.T) { + // generate keys for both clients + aPK, aSK := cipher.GenerateKeyPair() + bPK, bSK := cipher.GenerateKeyPair() + + // create remote + a := NewClient(aPK, aSK, dc) + a.SetLogger(logging.MustGetLogger("A")) + err = a.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + // create initiator + b := NewClient(bPK, bSK, dc) + b.SetLogger(logging.MustGetLogger("B")) + err = b.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + bTransport, err := b.Dial(context.Background(), aPK) + require.NoError(t, err) + + aTransport, err := a.Accept(context.Background()) + require.NoError(t, err) + + msgCount := 100 + for i := 0; i < msgCount; i++ { + msg := "Hello there!" + + _, err := bTransport.Write([]byte(msg)) + require.NoError(t, err) + + recMsg := make([]byte, 5) + n, err := aTransport.Read(recMsg) + require.NoError(t, err) + + received := string(recMsg[:n]) + + log.Printf("Received: %v , bytes: %v", received, n) + + n, err = aTransport.Read(recMsg) + require.NoError(t, err) + + received += string(recMsg[:n]) + + log.Printf("Received: %v , bytes: %v", received, n) + + n, err = aTransport.Read(recMsg) + require.NoError(t, err) + + received += string(recMsg[:n]) + log.Printf("Last bytes count: %v", n) + + require.Equal(t, received, msg) + } + + err = bTransport.Close() + require.NoError(t, err) + + err = aTransport.Close() + require.NoError(t, err) + + err = a.Close() + require.NoError(t, err) + + err = b.Close() + require.NoError(t, err) + }) } // Given two client instances (a & b) and a server instance (s), From f27487030389b7c11db246b2c939881250401873 Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 18:19:00 +0300 Subject: [PATCH 10/11] Add some more comments --- pkg/dmsg/server_test.go | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index f64a122b48..0f617a54c1 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -567,6 +567,7 @@ func TestServer_Serve(t *testing.T) { var readErr, writeErr error go func() { + // read/write to/from transport until the stop signal arrives for { select { case <-readWriteStop: @@ -646,6 +647,7 @@ func TestServer_Serve(t *testing.T) { err = b.InitiateServerConnections(context.Background(), 1) require.NoError(t, err) + // create transports bTransport, err := b.Dial(context.Background(), aPK) require.NoError(t, err) @@ -656,30 +658,35 @@ func TestServer_Serve(t *testing.T) { for i := 0; i < msgCount; i++ { msg := "Hello there!" + // write message of 12 bytes _, err := bTransport.Write([]byte(msg)) require.NoError(t, err) - recMsg := make([]byte, 5) - n, err := aTransport.Read(recMsg) - require.NoError(t, err) + // create a receiving buffer of 5 bytes + recBuff := make([]byte, 5) - received := string(recMsg[:n]) + // read 5 bytes, 7 left + n, err := aTransport.Read(recBuff) + require.NoError(t, err) + require.Equal(t, n, len(recBuff)) - log.Printf("Received: %v , bytes: %v", received, n) + received := string(recBuff[:n]) - n, err = aTransport.Read(recMsg) + // read 5 more, 2 left + n, err = aTransport.Read(recBuff) require.NoError(t, err) + require.Equal(t, n, len(recBuff)) - received += string(recMsg[:n]) - - log.Printf("Received: %v , bytes: %v", received, n) + received += string(recBuff[:n]) - n, err = aTransport.Read(recMsg) + // read 2 bytes left + n, err = aTransport.Read(recBuff) require.NoError(t, err) + require.Equal(t, n, len(msg)-len(recBuff)*2) - received += string(recMsg[:n]) - log.Printf("Last bytes count: %v", n) + received += string(recBuff[:n]) + // received string must be equal to the sent one require.Equal(t, received, msg) } From c7aeacde1c8f779bc24dd71b32984d62e9f7a62e Mon Sep 17 00:00:00 2001 From: Sir Darkrengarius Date: Tue, 18 Jun 2019 18:28:39 +0300 Subject: [PATCH 11/11] Add one more case to the failed transport test --- pkg/dmsg/server_test.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 0f617a54c1..2a4b8a9b10 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -589,9 +589,20 @@ func TestServer_Serve(t *testing.T) { // continue creating transports until the error occurs for { - ctx := context.Background() - //ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) - if _, err := a.Dial(ctx, bPK); err != nil { + //ctx := context.Background() + ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + if _, err = a.Dial(ctx, bPK); err != nil { + break + } + } + // must be error + require.Error(t, err) + + // the same as above, transport is created by another client + for { + //ctx := context.Background() + ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond) + if _, err = b.Dial(ctx, aPK); err != nil { break } } @@ -702,6 +713,10 @@ func TestServer_Serve(t *testing.T) { err = b.Close() require.NoError(t, err) }) + + t.Run("test capped_transport_buffer_should_not_result_in_hang", func(t *testing.T) { + + }) } // Given two client instances (a & b) and a server instance (s),