diff --git a/pkg/dmsg/server_test.go b/pkg/dmsg/server_test.go index 1c40d514ee..8b74d4fced 100644 --- a/pkg/dmsg/server_test.go +++ b/pkg/dmsg/server_test.go @@ -540,7 +540,7 @@ func TestServer_Serve(t *testing.T) { } }) - t.Run("test failed accept not hanging already established transport", func(t *testing.T) { + t.Run("failed_accepts_should_not_result_in_hang", func(t *testing.T) { // generate keys for both clients aPK, aSK := cipher.GenerateKeyPair() bPK, bSK := cipher.GenerateKeyPair() @@ -632,9 +632,7 @@ func TestServer_Serve(t *testing.T) { err = a.Close() require.NoError(t, err) - b.log.Println("BEFORE CLOSING") err = b.Close() - b.log.Println("AFTER CLOSING") require.NoError(t, err) }) @@ -709,8 +707,65 @@ func TestServer_Serve(t *testing.T) { require.NoError(t, err) }) - t.Run("test capped_transport_buffer_should_not_result_in_hang", func(t *testing.T) { + t.Run("capped_transport_buffer_should_not_result_in_hang", func(t *testing.T) { + // generate keys for both clients + aPK, aSK := cipher.GenerateKeyPair() + bPK, bSK := cipher.GenerateKeyPair() + + // create remote + a := NewClient(aPK, aSK, dc, SetLogger(logging.MustGetLogger("A"))) + err = a.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + // create initiator + b := NewClient(bPK, bSK, dc, SetLogger(logging.MustGetLogger("B"))) + err = b.InitiateServerConnections(context.Background(), 1) + require.NoError(t, err) + + // create transports + aWrTransport, err := a.Dial(context.Background(), bPK) + require.NoError(t, err) + + _, err = b.Accept(context.Background()) + require.NoError(t, err) + msg := []byte("Hello there!") + // exact iterations to fill the receiving buffer and hang `Write` + iterationsToDo := tpBufCap/len(msg) + 1 + + // fill the buffer, but no block yet + for i := 0; i < iterationsToDo-1; i++ { + _, err = aWrTransport.Write(msg) + require.NoError(t, err) + } + + // block on `Write` + go func() { + _, err = aWrTransport.Write(msg) + require.NoError(t, err) + }() + + // wait till it's definitely blocked + time.Sleep(1 * time.Second) + + // create new transport from `B` to `A` + bWrTransport, err := b.Dial(context.Background(), aPK) + require.NoError(t, err) + + aRdTransport, err := a.Accept(context.Background()) + require.NoError(t, err) + + // try to write/read message via the new transports + for i := 0; i < 100; i++ { + _, err := bWrTransport.Write(msg) + require.NoError(t, err) + + recBuff := make([]byte, len(msg)) + _, err = aRdTransport.Read(recBuff) + require.NoError(t, err) + + require.Equal(t, recBuff, msg) + } }) }