Skip to content

Commit

Permalink
Ensure token is refreshed on Unauthenticated (#239)
Browse files Browse the repository at this point in the history
* Ensure token is refreshed on Unauthenticated

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Cleanup

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Add lock/unlock in BaseTokenOrchestrator as well

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* generate

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* purge the token cache only once

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* revert

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Update Purge to PurgeIfEquals

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Use cached token if one exists

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* lock once

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Implement Condition Variable to avoid serializing auth requests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* Adding typed error and upgrading stow

* wip

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* updating go.sum

* updating go.sum for all services with latest stow version

* fix some tests

* fix unit tests

* fix stow unit tests and linter fixes

* merge conflict fixes

* nit

---------

Signed-off-by: Haytham Abuelfutuh <[email protected]>
Co-authored-by: pmahindrakar-oss <[email protected]>
  • Loading branch information
EngHabu and pmahindrakar-oss authored May 20, 2024
1 parent 975fe4d commit 67e916a
Show file tree
Hide file tree
Showing 38 changed files with 400 additions and 142 deletions.
2 changes: 1 addition & 1 deletion cacheservice/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cacheservice/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
2 changes: 1 addition & 1 deletion datacatalog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions datacatalog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
2 changes: 1 addition & 1 deletion fasttask/plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions fasttask/plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
6 changes: 2 additions & 4 deletions fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,10 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error {
// if taskStatus is complete then enqueueOwner for fast feedback
phase := core.Phase(taskStatus.GetPhase())
if phase == core.PhaseSuccess || phase == core.PhaseRetryableFailure {
ownerID := types.NamespacedName{
if err := f.enqueueOwner(types.NamespacedName{
Namespace: taskStatus.GetNamespace(),
Name: taskStatus.GetWorkflowId(),
}

if err := f.enqueueOwner(ownerID); err != nil {
}); err != nil {
logger.Warnf(context.Background(), "failed to enqueue owner for task %s: %+v", taskStatus.GetTaskId(), err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/flyte/flytepropeller v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/flyte/flytestdlib v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/stow v0.3.8
github.com/flyteorg/stow v0.3.10
github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344
github.com/go-gormigrate/gormigrate/v2 v2.1.1
github.com/golang-jwt/jwt/v4 v4.5.0
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down
2 changes: 1 addition & 1 deletion flytecopilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down
4 changes: 2 additions & 2 deletions flytecopilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
62 changes: 51 additions & 11 deletions flyteidl/clients/go/admin/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import (
"fmt"
"net/http"

"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flytestdlib/logger"
"golang.org/x/oauth2"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc"
"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

const ProxyAuthorizationHeader = "proxy-authorization"

// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server.
// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values.
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache,
perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {

authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err)
Expand All @@ -43,11 +44,17 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T

tokenSource, err := tokenSourceProvider.GetTokenSource(ctx)
if err != nil {
return err
return fmt.Errorf("failed to get token source. Error: %w", err)
}

_, err = tokenSource.Token()
if err != nil {
return fmt.Errorf("failed to issue token. Error: %w", err)
}

wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey)
perRPCCredentials.Store(wrappedTokenSource)

return nil
}

Expand Down Expand Up @@ -135,17 +142,49 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture)

// If there is already a token in the cache (e.g. key-ring), we should use it immediately...
t, _ := tokenCache.GetToken()
if t != nil {
err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to materialize credentials. Error: %v", err)
}
}

err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)

if st, ok := status.FromError(err); ok {
// If the error we receive from executing the request expects
if shouldAttemptToAuthenticate(st.Code()) {
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr)
err = func() error {
if !tokenCache.TryLock() {
tokenCache.CondWait()
return nil
}

defer tokenCache.Unlock()
_, err := tokenCache.PurgeIfEquals(t)
if err != nil && !errors.Is(err, cache.ErrNotFound) {
logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err)
return fmt.Errorf("failed to purge cache. Error: %w", err)
}

logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr)
logger.Errorf(ctx, errString)
return fmt.Errorf(errString)
}

tokenCache.CondBroadcast()
return nil
}()

if err != nil {
return err
}

return invoker(ctx, method, req, reply, cc, opts...)
Expand All @@ -168,6 +207,7 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti
}
return invoker(ctx, method, req, reply, cc, opts...)
}

return err
}
}
Loading

0 comments on commit 67e916a

Please sign in to comment.