-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Work in progress] Webscocket channel, Do not merge. #1084
Changes from 11 commits
bc81b66
e6535c8
4a80e24
bcd6d33
6021aa5
0cf0e68
df1ffe7
17b5333
9acf824
a80b016
49093b1
997610a
cfa73f4
b8d2468
ef6924c
2d53d98
b25186d
1a8fcc3
0acb1eb
63eeebb
6a8d151
d7ac9c0
0dc62a5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ type options struct { | |
websocketReadLimit int64 | ||
allowNonRootResources bool | ||
endpointsFunc *func() []string | ||
enableWebsocketChannels bool | ||
} | ||
|
||
func evaluateOptions(opts []Option) *options { | ||
|
@@ -126,6 +127,16 @@ func WithWebsockets(enableWebsockets bool) Option { | |
} | ||
} | ||
|
||
// WithWebsocketsChannel allows for handling grpc-web requests of websockets charing a single websocket between requests | ||
// - enabling bidirectional requests. | ||
// | ||
// The default behaviour is false, i.e. to disallow websockets channels | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional: 'Disabled by default.' |
||
func WithWebsocketsChannel(enableWebsocketsChannels bool) Option { | ||
return func(o *options) { | ||
o.enableWebsocketChannels = enableWebsocketsChannels | ||
} | ||
} | ||
|
||
// WithWebsocketPingInterval enables websocket keepalive pinging with the configured timeout. | ||
// | ||
// The default behaviour is to disable websocket pinging. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,6 +123,15 @@ func (w *WrappedGrpcServer) ServeHTTP(resp http.ResponseWriter, req *http.Reques | |
resp.WriteHeader(http.StatusForbidden) | ||
_, _ = resp.Write(make([]byte, 0)) | ||
return | ||
} else if w.opts.enableWebsocketChannels && w.IsGrpcWebSocketChannelRequest(req) { | ||
if w.websocketOriginFunc(req) { | ||
if !w.opts.corsForRegisteredEndpointsOnly || w.isRequestForRegisteredEndpoint(req) { | ||
w.HandleGrpcWebsocketRequest(resp, req) | ||
borissmidt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return | ||
} | ||
} | ||
resp.WriteHeader(http.StatusForbidden) | ||
_, _ = resp.Write(make([]byte, 0)) | ||
} | ||
|
||
if w.IsAcceptableGrpcCorsRequest(req) || w.IsGrpcWebRequest(req) { | ||
|
@@ -138,6 +147,12 @@ func (w *WrappedGrpcServer) IsGrpcWebSocketRequest(req *http.Request) bool { | |
return strings.ToLower(req.Header.Get("Upgrade")) == "websocket" && strings.ToLower(req.Header.Get("Sec-Websocket-Protocol")) == "grpc-websockets" | ||
} | ||
|
||
// IsGrpcWebSocketRequest determines if a request is a gRPC-Web request by checking that the "Sec-Websocket-Protocol" | ||
// header value is "grpc-websocket-channel" | ||
func (w *WrappedGrpcServer) IsGrpcWebSocketChannelRequest(req *http.Request) bool { | ||
return strings.ToLower(req.Header.Get("Upgrade")) == "websocket" && strings.ToLower(req.Header.Get("Sec-Websocket-Protocol")) == "grpc-websocket-channel" | ||
} | ||
|
||
// HandleGrpcWebRequest takes a HTTP request that is assumed to be a gRPC-Web request and wraps it with a compatibility | ||
// layer to transform it to a standard gRPC request for the wrapped gRPC server and transforms the response to comply | ||
// with the gRPC-Web protocol. | ||
|
@@ -221,6 +236,40 @@ func (w *WrappedGrpcServer) IsGrpcWebRequest(req *http.Request) bool { | |
return req.Method == http.MethodPost && strings.HasPrefix(req.Header.Get("content-type"), grpcWebContentType) | ||
} | ||
|
||
// HandleGrpcWebsocketChannelRequest takes a HTTP request that is assumed to be a gRPC-Websocket-channel request and starts a | ||
// duplexed grpc-websocket-channel which will create multiple virtual streams over a single websocket. | ||
func (w *WrappedGrpcServer) HandleGrpcWebsocketChannelRequest(resp http.ResponseWriter, req *http.Request) { | ||
|
||
wsConn, err := websocket.Accept(resp, req, &websocket.AcceptOptions{ | ||
InsecureSkipVerify: true, // managed by ServeHTTP | ||
Subprotocols: []string{"grpc-websocket-channel"}, | ||
}) | ||
if err != nil { | ||
grpclog.Errorf("Unable to upgrade websocket request: %v", err) | ||
return | ||
} | ||
|
||
//todo make this configureable to equal it to the biggest possible grpc message. | ||
wsConn.SetReadLimit(4 * 1024 * 1024) | ||
|
||
headers := make(http.Header) | ||
for _, name := range w.allowedHeaders { | ||
if values, exist := req.Header[name]; exist { | ||
headers[name] = values | ||
} | ||
} | ||
|
||
ctx, cancelFunc := context.WithCancel(req.Context()) | ||
defer cancelFunc() | ||
NewWebsocketChannel(wsConn, w.handler, ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to show were this went :) |
||
websocketChannel := NewWebsocketChannel(wsConn, w.handler, ctx) | ||
//todo add support for ping | ||
// if w.opts.websocketPingInterval >= time.Second { | ||
// websokcetChannel.enablePing(w.opts.websocketPingInterval) | ||
// } | ||
websocketChannel.Start() | ||
} | ||
|
||
// IsAcceptableGrpcCorsRequest determines if a request is a CORS pre-flight request for a gRPC-Web request and that this | ||
// request is acceptable for CORS. | ||
// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,6 +44,8 @@ var ( | |
websocketPingInterval = pflag.Duration("websocket_ping_interval", 0, "whether to use websocket keepalive pinging. Only used when using websockets. Configured interval must be >= 1s.") | ||
websocketReadLimit = pflag.Int64("websocket_read_limit", 0, "sets the maximum message read limit on the underlying websocket. The default message read limit is 32769 bytes.") | ||
|
||
useWebsocketsChannel = pflag.Bool("use_websockets_channels", false, "whether to use a chanllel over websocket transport layer (alpha)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: s/chanllel/channel |
||
|
||
flagHttpMaxWriteTimeout = pflag.Duration("server_http_max_write_timeout", 10*time.Second, "HTTP server config, max write duration.") | ||
flagHttpMaxReadTimeout = pflag.Duration("server_http_max_read_timeout", 10*time.Second, "HTTP server config, max read duration.") | ||
|
||
|
@@ -102,6 +104,26 @@ func main() { | |
) | ||
} | ||
|
||
if *useWebsocketsChannel { | ||
logrus.Println("using websockets based channels") | ||
options = append( | ||
options, | ||
grpcweb.WithWebsocketsChannel(true), | ||
grpcweb.WithWebsocketOriginFunc(makeWebsocketOriginFunc(allowedOrigins)), | ||
) | ||
if *websocketPingInterval >= time.Second { | ||
logrus.Infof("websocket keepalive pinging enabled, the timeout interval is %s", websocketPingInterval.String()) | ||
} | ||
if *websocketReadLimit > 0 { | ||
options = append(options, grpcweb.WithWebsocketsMessageReadLimit(*websocketReadLimit)) | ||
} | ||
|
||
options = append( | ||
options, | ||
grpcweb.WithWebsocketPingInterval(*websocketPingInterval), | ||
) | ||
} | ||
|
||
if len(*flagAllowedHeaders) > 0 { | ||
options = append( | ||
options, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: s/charing/sharing