Skip to content

Commit

Permalink
transport: add peer information to http2Server and http2Client context (
Browse files Browse the repository at this point in the history
  • Loading branch information
feihu-stripe authored Aug 24, 2022
1 parent 02fbca0 commit 641dc87
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
4 changes: 3 additions & 1 deletion internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
}
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())

if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
Expand Down Expand Up @@ -469,7 +471,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
func (t *http2Client) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo,
AuthInfo: t.authInfo, // Can be nil
}
}

Expand Down
19 changes: 11 additions & 8 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
czData: new(channelzData),
bufferPool: newBufferPool(),
}
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())

t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
t.bdpEst = &bdpEstimator{
Expand Down Expand Up @@ -485,14 +488,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
}
pr := &peer.Peer{
Addr: t.remoteAddr,
}
// Attach Auth info if there is any.
if t.authInfo != nil {
pr.AuthInfo = t.authInfo
}
s.ctx = peer.NewContext(s.ctx, pr)

// Attach the received metadata to the context.
if len(mdata) > 0 {
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
Expand Down Expand Up @@ -1416,6 +1412,13 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}

func (t *http2Server) getPeer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
}
}

func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0
Expand Down
51 changes: 51 additions & 0 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"testing"
"time"

"google.golang.org/grpc/peer"

"github.com/google/go-cmp/cmp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
Expand Down Expand Up @@ -2450,3 +2452,52 @@ func TestConnectionError_Unwrap(t *testing.T) {
t.Error("ConnectionError does not unwrap")
}
}

func (s) TestPeerSetInServerContext(t *testing.T) {
// create client and server transports.
server, client, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
defer server.stop()
defer client.Close(fmt.Errorf("closed manually by test"))

// create a stream with client transport.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
stream, err := client.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("failed to create a stream: %v", err)
}

waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()

if len(server.conns) == 0 {
return true, fmt.Errorf("timed-out while waiting for connection to be created on the server")
}
return false, nil
})

// verify peer is set in client transport context.
if _, ok := peer.FromContext(client.ctx); !ok {
t.Fatalf("Peer expected in client transport's context, but actually not found.")
}

// verify peer is set in stream context.
if _, ok := peer.FromContext(stream.ctx); !ok {
t.Fatalf("Peer expected in stream context, but actually not found.")
}

// verify peer is set in server transport context.
server.mu.Lock()
for k := range server.conns {
sc, ok := k.(*http2Server)
if !ok {
t.Fatalf("ServerTransport is of type %T, want %T", k, &http2Server{})
}
if _, ok = peer.FromContext(sc.ctx); !ok {
t.Fatalf("Peer expected in server transport's context, but actually not found.")
}
}
server.mu.Unlock()
}

0 comments on commit 641dc87

Please sign in to comment.