From dde763554a026bb9c2a9e0a8b94c17689329a97e Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 16 Aug 2022 16:37:08 -0700 Subject: [PATCH 01/10] Add peer infomation to http2server context. --- internal/transport/http2_server.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 28bcba0a33c6..e4eb571eae62 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -265,6 +265,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, czData: new(channelzData), bufferPool: newBufferPool(), } + // Add peer information to the http2server context. + t.ctx = peer.NewContext(t.ctx, t.getPeer()) + t.controlBuf = newControlBuffer(t.done) if dynamicWindow { t.bdpEst = &bdpEstimator{ @@ -485,14 +488,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( } else { s.ctx, s.cancel = context.WithCancel(t.ctx) } - pr := &peer.Peer{ - Addr: t.remoteAddr, - } - // Attach Auth info if there is any. - if t.authInfo != nil { - pr.AuthInfo = t.authInfo - } - s.ctx = peer.NewContext(s.ctx, pr) + + // Add peer information to the stream context. + s.ctx = peer.NewContext(s.ctx, t.getPeer()) + // Attach the received metadata to the context. if len(mdata) > 0 { s.ctx = metadata.NewIncomingContext(s.ctx, mdata) @@ -1416,6 +1415,17 @@ func (t *http2Server) getOutFlowWindow() int64 { } } +func (t *http2Server) getPeer() *peer.Peer { + pr := &peer.Peer{ + Addr: t.remoteAddr, + } + // Attach Auth info if there is any. + if t.authInfo != nil { + pr.AuthInfo = t.authInfo + } + return pr +} + func getJitter(v time.Duration) time.Duration { if v == infinity { return 0 From 4a55ac5a37e98849d576a18e736f95c0559cc889 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Wed, 17 Aug 2022 08:42:47 -0700 Subject: [PATCH 02/10] small update --- internal/transport/http2_server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index e4eb571eae62..1011c573fefe 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -265,6 +265,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, czData: new(channelzData), bufferPool: newBufferPool(), } + // Add peer information to the http2server context. t.ctx = peer.NewContext(t.ctx, t.getPeer()) From 33e6de3d0e333b6a4b59782174e6bce3ca75b647 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Wed, 17 Aug 2022 09:22:20 -0700 Subject: [PATCH 03/10] Revert "small update" This reverts commit 4a55ac5a37e98849d576a18e736f95c0559cc889. --- internal/transport/http2_server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 1011c573fefe..e4eb571eae62 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -265,7 +265,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, czData: new(channelzData), bufferPool: newBufferPool(), } - // Add peer information to the http2server context. t.ctx = peer.NewContext(t.ctx, t.getPeer()) From a2e0810efdec0b08a3021281ba838d604dacf1b3 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Sun, 21 Aug 2022 22:05:01 -0700 Subject: [PATCH 04/10] pr --- internal/transport/http2_server.go | 3 --- internal/transport/transport_test.go | 19 +++++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index e4eb571eae62..5fb540ca4eaf 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -489,9 +489,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( s.ctx, s.cancel = context.WithCancel(t.ctx) } - // Add peer information to the stream context. - s.ctx = peer.NewContext(s.ctx, t.getPeer()) - // Attach the received metadata to the context. if len(mdata) > 0 { s.ctx = metadata.NewIncomingContext(s.ctx, mdata) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c1f9664ada67..defafe0735ef 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -35,6 +35,8 @@ import ( "testing" "time" + "google.golang.org/grpc/peer" + "github.com/google/go-cmp/cmp" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" @@ -2450,3 +2452,20 @@ func TestConnectionError_Unwrap(t *testing.T) { t.Error("ConnectionError does not unwrap") } } + +// Verify Peer is set in server context. +func TestPeerSetInServerContext(t *testing.T) { + server := setUpServerOnly(t, 0, &ServerConfig{}, suspended) + defer server.stop() + + for k := range server.conns { + sc, ok := k.(*http2Server) + if !ok { + t.Fatalf("Failed to convert %v to *http2Server", k) + } + _, ok = peer.FromContext(sc.ctx) + if !ok { + t.Fatalf("peer expected in server context, but actually not found.") + } + } +} From 23e151dc75eb6221c4d349340706e38cbc0a188d Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 23 Aug 2022 14:20:18 -0700 Subject: [PATCH 05/10] handle feedbacks --- internal/transport/http2_client.go | 4 +- internal/transport/transport_test.go | 58 +++++++++++++++++++++++----- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 28c77af70aba..53643fa97477 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -326,6 +326,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts keepaliveEnabled: keepaliveEnabled, bufferPool: newBufferPool(), } + // Add peer information to the http2client context. + t.ctx = peer.NewContext(t.ctx, t.getPeer()) if md, ok := addr.Metadata.(*metadata.MD); ok { t.md = *md @@ -469,7 +471,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { func (t *http2Client) getPeer() *peer.Peer { return &peer.Peer{ Addr: t.remoteAddr, - AuthInfo: t.authInfo, + AuthInfo: t.authInfo, // Can be nil } } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index defafe0735ef..2b0171ea1e36 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2454,18 +2454,58 @@ func TestConnectionError_Unwrap(t *testing.T) { } // Verify Peer is set in server context. -func TestPeerSetInServerContext(t *testing.T) { - server := setUpServerOnly(t, 0, &ServerConfig{}, suspended) +func (s) TestPeerSetInServerContext(t *testing.T) { + // create client and server transports. + server, client, cancel := setUp(t, 0, math.MaxUint32, normal) + defer cancel() defer server.stop() + defer client.Close(fmt.Errorf("closed manually by test")) - for k := range server.conns { - sc, ok := k.(*http2Server) - if !ok { - t.Fatalf("Failed to convert %v to *http2Server", k) + // create a stream with client transport. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream, err := client.NewStream(ctx, &CallHdr{ + Host: "localhost", + Method: "foo.Small", + }) + if err != nil { + t.Fatalf("failed to create a stream: %v", err) + } + + // verify if peer is set in client transport context. + if _, ok := peer.FromContext(client.ctx); !ok { + t.Fatalf("Peer expected in client transport's context, but actually not found.") + } + + // verify of peer is set in stream context. + if _, ok := peer.FromContext(stream.ctx); !ok { + t.Fatalf("Peer expected in stream context, but actually not found.") + } + + // verify if peer is set in server transport context. + count := 0 + for { + server.mu.Lock() + if len(server.conns) == 0 { + server.mu.Unlock() + time.Sleep(time.Millisecond) + count += 1 + // wait for server transport setup for apprximately 1 second. + if count >= 1000 { + t.Fatalf("timed out waiting for server transport setup.") + } + continue } - _, ok = peer.FromContext(sc.ctx) - if !ok { - t.Fatalf("peer expected in server context, but actually not found.") + for k := range server.conns { + sc, ok := k.(*http2Server) + if !ok { + t.Fatalf("ServerTransport is of type %T, want %T", k, &http2Server{}) + } + if _, ok = peer.FromContext(sc.ctx); !ok { + t.Fatalf("Peer expected in server transport's context, but actually not found.") + } } + server.mu.Unlock() + break } } From 09496812d92e5f42be42c2a9ab9420e2a3559027 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 23 Aug 2022 14:22:26 -0700 Subject: [PATCH 06/10] handle feedbacks --- internal/transport/http2_server.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 5fb540ca4eaf..3dd15647bc84 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -1413,14 +1413,10 @@ func (t *http2Server) getOutFlowWindow() int64 { } func (t *http2Server) getPeer() *peer.Peer { - pr := &peer.Peer{ - Addr: t.remoteAddr, + return &peer.Peer{ + Addr: t.remoteAddr, + AuthInfo: t.authInfo, // Can be nil } - // Attach Auth info if there is any. - if t.authInfo != nil { - pr.AuthInfo = t.authInfo - } - return pr } func getJitter(v time.Duration) time.Duration { From 661205c0d1ea90364409ba5bd462b5c889b11347 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 23 Aug 2022 14:22:55 -0700 Subject: [PATCH 07/10] handle feedbacks --- internal/transport/transport_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 2b0171ea1e36..080e2b1a5302 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2453,7 +2453,6 @@ func TestConnectionError_Unwrap(t *testing.T) { } } -// Verify Peer is set in server context. func (s) TestPeerSetInServerContext(t *testing.T) { // create client and server transports. server, client, cancel := setUp(t, 0, math.MaxUint32, normal) @@ -2472,17 +2471,17 @@ func (s) TestPeerSetInServerContext(t *testing.T) { t.Fatalf("failed to create a stream: %v", err) } - // verify if peer is set in client transport context. + // verify peer is set in client transport context. if _, ok := peer.FromContext(client.ctx); !ok { t.Fatalf("Peer expected in client transport's context, but actually not found.") } - // verify of peer is set in stream context. + // verify peer is set in stream context. if _, ok := peer.FromContext(stream.ctx); !ok { t.Fatalf("Peer expected in stream context, but actually not found.") } - // verify if peer is set in server transport context. + // verify peer is set in server transport context. count := 0 for { server.mu.Lock() From 1e0e6d0c3d55d2eda0753dba13677a9c516c519b Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 23 Aug 2022 14:26:42 -0700 Subject: [PATCH 08/10] pr --- internal/transport/transport_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 080e2b1a5302..2ff4393ad829 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2463,10 +2463,7 @@ func (s) TestPeerSetInServerContext(t *testing.T) { // create a stream with client transport. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - stream, err := client.NewStream(ctx, &CallHdr{ - Host: "localhost", - Method: "foo.Small", - }) + stream, err := client.NewStream(ctx, &CallHdr{}) if err != nil { t.Fatalf("failed to create a stream: %v", err) } From 1bb5d728638bf2989180f0a222d2ebb9b1096936 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Tue, 23 Aug 2022 17:09:20 -0700 Subject: [PATCH 09/10] pr --- internal/transport/transport_test.go | 40 +++++++++++++--------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 2ff4393ad829..760e1b64f358 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2468,6 +2468,16 @@ func (s) TestPeerSetInServerContext(t *testing.T) { t.Fatalf("failed to create a stream: %v", err) } + waitWhileTrue(t, func() (bool, error) { + server.mu.Lock() + defer server.mu.Unlock() + + if len(server.conns) == 0 { + return true, fmt.Errorf("timed-out while waiting for connection to be created on the server") + } + return false, nil + }) + // verify peer is set in client transport context. if _, ok := peer.FromContext(client.ctx); !ok { t.Fatalf("Peer expected in client transport's context, but actually not found.") @@ -2479,29 +2489,15 @@ func (s) TestPeerSetInServerContext(t *testing.T) { } // verify peer is set in server transport context. - count := 0 - for { - server.mu.Lock() - if len(server.conns) == 0 { - server.mu.Unlock() - time.Sleep(time.Millisecond) - count += 1 - // wait for server transport setup for apprximately 1 second. - if count >= 1000 { - t.Fatalf("timed out waiting for server transport setup.") - } - continue + server.mu.Lock() + for k := range server.conns { + sc, ok := k.(*http2Server) + if !ok { + t.Fatalf("ServerTransport is of type %T, want %T", k, &http2Server{}) } - for k := range server.conns { - sc, ok := k.(*http2Server) - if !ok { - t.Fatalf("ServerTransport is of type %T, want %T", k, &http2Server{}) - } - if _, ok = peer.FromContext(sc.ctx); !ok { - t.Fatalf("Peer expected in server transport's context, but actually not found.") - } + if _, ok = peer.FromContext(sc.ctx); !ok { + t.Fatalf("Peer expected in server transport's context, but actually not found.") } - server.mu.Unlock() - break } + server.mu.Unlock() } From ee6c28aa36f69ee6a1ca9e3f55a690c8a214057a Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Wed, 24 Aug 2022 09:17:31 -0700 Subject: [PATCH 10/10] [empty] rerun ci