Skip to content

Commit

Permalink
Revert "Revert "Ensure token is refreshed on Unauthenticated (#239)" … (
Browse files Browse the repository at this point in the history
#294)

…(#292)"

This reverts commit ad19bf1.

## Overview
Fixes the racy behavior in auth token cache using mutexes and condition variables.

In a nutshell, the change does the following.

* Allows all parallel token requesters to try to acquire a lock
* One of the requester gets the lock and rest get waitlisted and wait for notification from the requester who acquired the lock
* The requester who acquired the lock refreshes the token and save to the token cache
* He also notifies the waitlisted requesters to now go and read the saved token cache and proceed with the request

The test plan below summarizes how this was tested

## Test Plan

https://buildkite.com/unionai/org-staging-sync/builds/2311
https://buildkite.com/unionai/managed-cluster-staging-sync/builds/3461


Serverless onboarding workshop 
https://buildkite.com/unionai/serverless-workshop-staging-deploy-and-run/builds/44#018fa797-269b-4c08-9809-f6561f8af777

This was tested locally aswell using integration test and was ran using 20 parallel routines
```
go test -v -tags=integration -run '^TestAPISuite$/^Test_RandomNameWorkflow$'
```

* All routines get unauthenticated in the first attempt of using inmemory cached token provider
```
cat t |grep "If it's an unauthenticated error, we will attempt to establish an authenticated context" |wc -l
      20
```
*  All routines try to lock
```
cat t |grep "try lock"  |wc -l                                                         python
      20
```
*  One of the routine is able to acquire the lock
```
cat t |grep "Locked : true"  |wc -l                                                    python
       1
```

* Rest fail to lock
```
cat t |grep "Locked : false"  |wc -l                                                   python
      19
```
* The ones who fail to lock go into waiting state . i.e they get added to condition variables waitlist

```
cat t |grep "Waiting" |grep -v "Coming" |wc -l                                         python
      19
```

eg : log for ref
```
{"json":{"src":"token_cache_inmemory.go:77"},"level":"info","msg":"Coming out of Waiting","ts":"2024-05-22T23:52:47-07:00"}
```

* The one who has locked goes and saves the token
```
cat t |grep "Saved"  |wc -l                                                            python
       1
```

* Broadcasts to all the others waiting using single notification which notifies all the waiters.
```
cat t |grep "Broadcasted"  |wc -l                                                      python
       1
```

* Post which they come out of waiting
```
 cat t |grep "Coming out of Waiting"|wc -l                                              python
      19
```

And go ahead and read the token


Log file for reference

[t.txt](https://github.com/unionai/flyte/files/15413364/t.txt)

## Rollout Plan (if applicable)
Merge this change and merge the cloud PR 

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).

- [X] To be upstreamed

## Issue
*TODO: Link Linear issue(s) using [magic words](https://linear.app/docs/github#magic-words). `fixes` will move to merged status, while `ref` will only link the PR.*

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
pmahindrakar-oss authored May 24, 2024
1 parent 092dfe8 commit 99ebd79
Show file tree
Hide file tree
Showing 38 changed files with 430 additions and 142 deletions.
2 changes: 1 addition & 1 deletion cacheservice/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cacheservice/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
2 changes: 1 addition & 1 deletion datacatalog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions datacatalog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
2 changes: 1 addition & 1 deletion fasttask/plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions fasttask/plugin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
6 changes: 2 additions & 4 deletions fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,10 @@ func (f *fastTaskServiceImpl) Heartbeat(stream pb.FastTask_HeartbeatServer) erro
// if taskStatus is complete then enqueueOwner for fast feedback
phase := core.Phase(taskStatus.GetPhase())
if phase == core.PhaseSuccess || phase == core.PhaseRetryableFailure {
ownerID := types.NamespacedName{
if err := f.enqueueOwner(types.NamespacedName{
Namespace: taskStatus.GetNamespace(),
Name: taskStatus.GetWorkflowId(),
}

if err := f.enqueueOwner(ownerID); err != nil {
}); err != nil {
logger.Warnf(context.Background(), "failed to enqueue owner for task %s: %+v", taskStatus.GetTaskId(), err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/flyte/flytepropeller v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/flyte/flytestdlib v1.10.7-0.20240104005944-64548962d375
github.com/flyteorg/stow v0.3.8
github.com/flyteorg/stow v0.3.10
github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344
github.com/go-gormigrate/gormigrate/v2 v2.1.1
github.com/golang-jwt/jwt/v4 v4.5.0
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down
2 changes: 1 addition & 1 deletion flytecopilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/flyteorg/stow v0.3.8 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
Expand Down
4 changes: 2 additions & 2 deletions flytecopilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ
github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8=
github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
Expand Down
61 changes: 50 additions & 11 deletions flyteidl/clients/go/admin/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import (
"fmt"
"net/http"

"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flytestdlib/logger"
"golang.org/x/oauth2"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc"
"github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flyte/flytestdlib/logger"
)

const ProxyAuthorizationHeader = "proxy-authorization"

// MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server.
// Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values.
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {
func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache,
perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error {

authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err)
Expand All @@ -43,11 +44,17 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T

tokenSource, err := tokenSourceProvider.GetTokenSource(ctx)
if err != nil {
return err
return fmt.Errorf("failed to get token source. Error: %w", err)
}

_, err = tokenSource.Token()
if err != nil {
return fmt.Errorf("failed to issue token. Error: %w", err)
}

wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey)
perRPCCredentials.Store(wrappedTokenSource)

return nil
}

Expand Down Expand Up @@ -135,17 +142,48 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture)

// If there is already a token in the cache (e.g. key-ring), we should use it immediately...
t, _ := tokenCache.GetToken()
if t != nil {
err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if err != nil {
return fmt.Errorf("failed to materialize credentials. Error: %v", err)
}
}

err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err)

if st, ok := status.FromError(err); ok {
// If the error we receive from executing the request expects
if shouldAttemptToAuthenticate(st.Code()) {
logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr)
err = func() error {
if !tokenCache.TryLock() {
tokenCache.CondWait()
return nil
}
defer tokenCache.Unlock()
_, err := tokenCache.PurgeIfEquals(t)
if err != nil && !errors.Is(err, cache.ErrNotFound) {
logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err)
return fmt.Errorf("failed to purge cache. Error: %w", err)
}

logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code())
newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture)
if newErr != nil {
errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr)
logger.Errorf(ctx, errString)
return fmt.Errorf(errString)
}

tokenCache.CondBroadcast()
return nil
}()

if err != nil {
return err
}

return invoker(ctx, method, req, reply, cc, opts...)
Expand All @@ -168,6 +206,7 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti
}
return invoker(ctx, method, req, reply, cc, opts...)
}

return err
}
}
Loading

0 comments on commit 99ebd79

Please sign in to comment.