Skip to content

Commit

Permalink
Add oauth2 http proxy client (#405)
Browse files Browse the repository at this point in the history
Signed-off-by: byhsu <[email protected]>
  • Loading branch information
ByronHsu authored May 24, 2023
1 parent 5f87602 commit 3e10bb8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 1 deletion.
19 changes: 19 additions & 0 deletions flyteidl/clients/go/admin/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package admin
import (
"context"
"fmt"
"net/http"

"github.com/flyteorg/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flytestdlib/logger"
"golang.org/x/oauth2"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -50,6 +52,21 @@ func shouldAttemptToAuthenticate(errorCode codes.Code) bool {
return errorCode == codes.Unauthenticated
}

// Set up http client used in oauth2
func setHTTPClientContext(ctx context.Context, cfg *Config) context.Context {
httpClient := &http.Client{}

if len(cfg.HTTPProxyURL.String()) > 0 {
// create a transport that uses the proxy
transport := &http.Transport{
Proxy: http.ProxyURL(&cfg.HTTPProxyURL.URL),
}
httpClient.Transport = transport
}

return context.WithValue(ctx, oauth2.HTTPClient, httpClient)
}

// NewAuthInterceptor creates a new grpc.UnaryClientInterceptor that forwards the grpc call and inspects the error.
// It will first invoke the grpc pipeline (to proceed with the request) with no modifications. It's expected for the grpc
// pipeline to already have a grpc.WithPerRPCCredentials() DialOption. If the perRPCCredentials has already been initialized,
Expand All @@ -62,6 +79,8 @@ func shouldAttemptToAuthenticate(errorCode codes.Code) bool {
// be able to find and acquire a valid AccessToken to annotate the request with.
func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFuture *PerRPCCredentialsFuture) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = setHTTPClientContext(ctx, cfg)

err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)
Expand Down
3 changes: 3 additions & 0 deletions flyteidl/clients/go/admin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type Config struct {
// find the full schema here https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto#L625
// Note that required packages may need to be preloaded to support certain service config. For example "google.golang.org/grpc/balancer/roundrobin" should be preloaded to have round-robin policy supported.
DefaultServiceConfig string `json:"defaultServiceConfig" pdflag:",Set the default service config for the admin gRPC client"`

// HTTPProxyURL allows operators to access external OAuth2 servers using an external HTTP Proxy
HTTPProxyURL config.URL `json:"httpProxyURL" pflag:",OPTIONAL: HTTP Proxy to be used for OAuth requests."`
}

var (
Expand Down
3 changes: 2 additions & 1 deletion flyteidl/clients/go/admin/token_source_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,14 @@ func (s *customTokenSource) Token() (*oauth2.Token, error) {

token, err := s.new.Token()
if err != nil {
logger.Warnf(s.ctx, "failed to get token: %w", err)
return nil, fmt.Errorf("failed to get token: %w", err)
}
logger.Infof(s.ctx, "retrieved token with expiry %v", token.Expiry)

err = s.tokenCache.SaveToken(token)
if err != nil {
logger.Warnf(s.ctx, "failed to cache token")
logger.Warnf(s.ctx, "failed to cache token: %w", err)
}

return token, nil
Expand Down

0 comments on commit 3e10bb8

Please sign in to comment.