From 67e916a084e6c066257db945e121a6d3bedb92e4 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 20 May 2024 09:13:14 -0700 Subject: [PATCH] Ensure token is refreshed on Unauthenticated (#239) * Ensure token is refreshed on Unauthenticated Signed-off-by: Haytham Abuelfutuh * Cleanup Signed-off-by: Haytham Abuelfutuh * Add lock/unlock in BaseTokenOrchestrator as well Signed-off-by: Haytham Abuelfutuh * generate Signed-off-by: Haytham Abuelfutuh * Unit tests Signed-off-by: Haytham Abuelfutuh * purge the token cache only once Signed-off-by: Haytham Abuelfutuh * revert Signed-off-by: Haytham Abuelfutuh * Update Purge to PurgeIfEquals Signed-off-by: Haytham Abuelfutuh * Use cached token if one exists Signed-off-by: Haytham Abuelfutuh * lock once Signed-off-by: Haytham Abuelfutuh * Implement Condition Variable to avoid serializing auth requests Signed-off-by: Haytham Abuelfutuh * Adding typed error and upgrading stow * wip Signed-off-by: Haytham Abuelfutuh * updating go.sum * updating go.sum for all services with latest stow version * fix some tests * fix unit tests * fix stow unit tests and linter fixes * merge conflict fixes * nit --------- Signed-off-by: Haytham Abuelfutuh Co-authored-by: pmahindrakar-oss --- cacheservice/go.mod | 2 +- cacheservice/go.sum | 4 +- datacatalog/go.mod | 2 +- datacatalog/go.sum | 4 +- fasttask/plugin/go.mod | 2 +- fasttask/plugin/go.sum | 4 +- fasttask/plugin/service.go | 6 +- flyteadmin/go.mod | 2 +- flyteadmin/go.sum | 4 +- flytecopilot/go.mod | 2 +- flytecopilot/go.sum | 4 +- flyteidl/clients/go/admin/auth_interceptor.go | 62 ++++++-- .../clients/go/admin/auth_interceptor_test.go | 146 ++++++++++++------ .../go/admin/cache/mocks/token_cache.go | 91 +++++++++++ .../clients/go/admin/cache/token_cache.go | 29 +++- .../go/admin/cache/token_cache_inmemory.go | 49 +++++- flyteidl/clients/go/admin/client_builder.go | 2 +- .../clients/go/admin/client_builder_test.go | 4 +- flyteidl/clients/go/admin/client_test.go | 4 +- .../deviceflow/token_orchestrator_test.go | 4 +- .../admin/pkce/auth_flow_orchestrator_test.go | 2 +- .../clients/go/admin/token_source_provider.go | 8 +- .../go/admin/token_source_provider_test.go | 4 +- .../base_token_orchestrator.go | 21 ++- .../base_token_orchestrator_test.go | 8 +- flyteidl/clients/go/assets/admin.swagger.json | 34 ++-- flyteidl/go.mod | 2 +- flyteidl/go.sum | 4 +- flyteplugins/go.mod | 2 +- flyteplugins/go.sum | 4 +- flytepropeller/go.mod | 2 +- flytepropeller/go.sum | 4 +- flytestdlib/go.mod | 2 +- flytestdlib/go.sum | 4 +- flytestdlib/storage/stow_store.go | 4 +- flytestdlib/storage/stow_store_test.go | 4 +- go.mod | 2 +- go.sum | 4 +- 38 files changed, 400 insertions(+), 142 deletions(-) diff --git a/cacheservice/go.mod b/cacheservice/go.mod index d5676ba50e0..c7e8a7625ab 100644 --- a/cacheservice/go.mod +++ b/cacheservice/go.mod @@ -81,7 +81,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/cacheservice/go.sum b/cacheservice/go.sum index b7e2b841b9f..dcd2862f732 100644 --- a/cacheservice/go.sum +++ b/cacheservice/go.sum @@ -149,8 +149,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/datacatalog/go.mod b/datacatalog/go.mod index a23ceb162bd..56b004fd744 100644 --- a/datacatalog/go.mod +++ b/datacatalog/go.mod @@ -44,7 +44,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect diff --git a/datacatalog/go.sum b/datacatalog/go.sum index 5a1c875f859..6ab73acf0d1 100644 --- a/datacatalog/go.sum +++ b/datacatalog/go.sum @@ -113,8 +113,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/fasttask/plugin/go.mod b/fasttask/plugin/go.mod index 2484572ccef..441ad8cc095 100644 --- a/fasttask/plugin/go.mod +++ b/fasttask/plugin/go.mod @@ -43,7 +43,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/fasttask/plugin/go.sum b/fasttask/plugin/go.sum index 108f2187b71..affebdbb635 100644 --- a/fasttask/plugin/go.sum +++ b/fasttask/plugin/go.sum @@ -62,8 +62,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 6edf3ad38b4..f94fb29acc7 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -173,12 +173,10 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { // if taskStatus is complete then enqueueOwner for fast feedback phase := core.Phase(taskStatus.GetPhase()) if phase == core.PhaseSuccess || phase == core.PhaseRetryableFailure { - ownerID := types.NamespacedName{ + if err := f.enqueueOwner(types.NamespacedName{ Namespace: taskStatus.GetNamespace(), Name: taskStatus.GetWorkflowId(), - } - - if err := f.enqueueOwner(ownerID); err != nil { + }); err != nil { logger.Warnf(context.Background(), "failed to enqueue owner for task %s: %+v", taskStatus.GetTaskId(), err) } } diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 3632bb22440..7c1f3f14edd 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -17,7 +17,7 @@ require ( github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375 github.com/flyteorg/flyte/flytepropeller v1.10.7-0.20240104005944-64548962d375 github.com/flyteorg/flyte/flytestdlib v1.10.7-0.20240104005944-64548962d375 - github.com/flyteorg/stow v0.3.8 + github.com/flyteorg/stow v0.3.10 github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344 github.com/go-gormigrate/gormigrate/v2 v2.1.1 github.com/golang-jwt/jwt/v4 v4.5.0 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 473db1d4bab..6f7c9ccce75 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -246,8 +246,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= diff --git a/flytecopilot/go.mod b/flytecopilot/go.mod index 4832ec9177d..21c3c1f12cd 100644 --- a/flytecopilot/go.mod +++ b/flytecopilot/go.mod @@ -40,7 +40,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/flytecopilot/go.sum b/flytecopilot/go.sum index eaee1f12ac2..f18b1731264 100644 --- a/flytecopilot/go.sum +++ b/flytecopilot/go.sum @@ -103,8 +103,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flyteidl/clients/go/admin/auth_interceptor.go b/flyteidl/clients/go/admin/auth_interceptor.go index 09391df67ae..a5cdeab4ef7 100644 --- a/flyteidl/clients/go/admin/auth_interceptor.go +++ b/flyteidl/clients/go/admin/auth_interceptor.go @@ -6,22 +6,23 @@ import ( "fmt" "net/http" - "github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" - "github.com/flyteorg/flyte/flytestdlib/logger" "golang.org/x/oauth2" - + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/grpc" + "github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" + "github.com/flyteorg/flyte/flytestdlib/logger" ) const ProxyAuthorizationHeader = "proxy-authorization" // MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. // Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. -func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error { +func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, + perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error { + authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture) if err != nil { return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err) @@ -43,11 +44,17 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T tokenSource, err := tokenSourceProvider.GetTokenSource(ctx) if err != nil { - return err + return fmt.Errorf("failed to get token source. Error: %w", err) + } + + _, err = tokenSource.Token() + if err != nil { + return fmt.Errorf("failed to issue token. Error: %w", err) } wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey) perRPCCredentials.Store(wrappedTokenSource) + return nil } @@ -135,6 +142,15 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) + // If there is already a token in the cache (e.g. key-ring), we should use it immediately... + t, _ := tokenCache.GetToken() + if t != nil { + err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) + if err != nil { + return fmt.Errorf("failed to materialize credentials. Error: %v", err) + } + } + err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) @@ -142,10 +158,33 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut if st, ok := status.FromError(err); ok { // If the error we receive from executing the request expects if shouldAttemptToAuthenticate(st.Code()) { - logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) - newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) - if newErr != nil { - return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr) + err = func() error { + if !tokenCache.TryLock() { + tokenCache.CondWait() + return nil + } + + defer tokenCache.Unlock() + _, err := tokenCache.PurgeIfEquals(t) + if err != nil && !errors.Is(err, cache.ErrNotFound) { + logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err) + return fmt.Errorf("failed to purge cache. Error: %w", err) + } + + logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) + newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) + if newErr != nil { + errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr) + logger.Errorf(ctx, errString) + return fmt.Errorf(errString) + } + + tokenCache.CondBroadcast() + return nil + }() + + if err != nil { + return err } return invoker(ctx, method, req, reply, cc, opts...) @@ -168,6 +207,7 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti } return invoker(ctx, method, req, reply, cc, opts...) } + return err } } diff --git a/flyteidl/clients/go/admin/auth_interceptor_test.go b/flyteidl/clients/go/admin/auth_interceptor_test.go index ce99c992708..10c96625b78 100644 --- a/flyteidl/clients/go/admin/auth_interceptor_test.go +++ b/flyteidl/clients/go/admin/auth_interceptor_test.go @@ -2,13 +2,14 @@ package admin import ( "context" + "encoding/json" "errors" "fmt" "io" "net" "net/http" - "net/http/httptest" "net/url" + "os" "strings" "sync" "testing" @@ -31,10 +32,11 @@ import ( // authMetadataServer is a fake AuthMetadataServer that takes in an AuthMetadataServer implementation (usually one // initialized through mockery) and starts a local server that uses it to respond to grpc requests. type authMetadataServer struct { - s *httptest.Server t testing.TB - port int + grpcPort int + httpPort int grpcServer *grpc.Server + httpServer *http.Server netListener net.Listener impl service.AuthMetadataServiceServer lck *sync.RWMutex @@ -70,27 +72,49 @@ func (s authMetadataServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) } +func (s *authMetadataServer) tokenHandler(w http.ResponseWriter, r *http.Request) { + tokenJSON := []byte(`{"access_token": "exampletoken", "token_type": "bearer"}`) + w.Header().Set("Content-Type", "application/json") + _, err := w.Write(tokenJSON) + assert.NoError(s.t, err) +} + func (s *authMetadataServer) Start(_ context.Context) error { s.lck.Lock() defer s.lck.Unlock() /***** Set up the server serving channelz service. *****/ - lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", s.port)) + + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", s.grpcPort)) if err != nil { - return fmt.Errorf("failed to listen on port [%v]: %w", s.port, err) + return fmt.Errorf("failed to listen on port [%v]: %w", s.grpcPort, err) } + s.netListener = lis grpcS := grpc.NewServer() service.RegisterAuthMetadataServiceServer(grpcS, s) go func() { + defer grpcS.Stop() _ = grpcS.Serve(lis) - //assert.NoError(s.t, err) }() - s.grpcServer = grpcS - s.netListener = lis + mux := http.NewServeMux() + // Attach the handler to the /oauth2/token path + mux.HandleFunc("/oauth2/token", s.tokenHandler) + + //nolint:gosec + s.httpServer = &http.Server{ + Addr: fmt.Sprintf("localhost:%d", s.httpPort), + Handler: mux, + } - s.s = httptest.NewServer(s) + go func() { + defer s.httpServer.Close() + err := s.httpServer.ListenAndServe() + if err != nil { + panic(err) + } + }() return nil } @@ -98,25 +122,30 @@ func (s *authMetadataServer) Start(_ context.Context) error { func (s *authMetadataServer) Close() { s.lck.RLock() defer s.lck.RUnlock() - s.grpcServer.Stop() - s.s.Close() } -func newAuthMetadataServer(t testing.TB, port int, impl service.AuthMetadataServiceServer) *authMetadataServer { +func newAuthMetadataServer(t testing.TB, grpcPort int, httpPort int, impl service.AuthMetadataServiceServer) *authMetadataServer { return &authMetadataServer{ - port: port, - t: t, - impl: impl, - lck: &sync.RWMutex{}, + grpcPort: grpcPort, + httpPort: httpPort, + t: t, + impl: impl, + lck: &sync.RWMutex{}, } } func Test_newAuthInterceptor(t *testing.T) { + plan, _ := os.ReadFile("tokenorchestrator/testdata/token.json") + var tokenData oauth2.Token + err := json.Unmarshal(plan, &tokenData) + assert.NoError(t, err) t.Run("Other Error", func(t *testing.T) { f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, f, p) + mockTokenCache := &mocks.TokenCache{} + mockTokenCache.OnGetTokenMatch().Return(&tokenData, nil) + interceptor := NewAuthInterceptor(&Config{}, mockTokenCache, f, p) otherError := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Canceled, "").Err() } @@ -129,35 +158,43 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - port := rand.IntnRange(10000, 60000) + httpPort := rand.IntnRange(10000, 60000) + grpcPort := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(&service.OAuth2MetadataResponse{ - AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", port), - TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", port), - JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", port), + AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", httpPort), + TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), + JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", httpPort), }, nil) + m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(&service.PublicClientAuthConfigResponse{ Scopes: []string{"all"}, }, nil) - s := newAuthMetadataServer(t, port, m) + s := newAuthMetadataServer(t, grpcPort, httpPort, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() + c := &mocks.TokenCache{} + c.OnGetTokenMatch().Return(nil, nil) + c.OnTryLockMatch().Return(true) + c.OnSaveTokenMatch(mock.Anything).Return(nil) + c.On("CondBroadcast").Return() + c.On("Unlock").Return() + c.OnPurgeIfEqualsMatch(mock.Anything).Return(true, nil) interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f, p) + }, c, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Unauthenticated, "").Err() } - err = interceptor(ctx, "POST", nil, nil, nil, unauthenticated) assert.Error(t, err) assert.Truef(t, f.IsInitialized(), "PerRPCCredentialFuture should be initialized") @@ -169,24 +206,26 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - port := rand.IntnRange(10000, 60000) + httpPort := rand.IntnRange(10000, 60000) + grpcPort := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} - s := newAuthMetadataServer(t, port, m) + s := newAuthMetadataServer(t, grpcPort, httpPort, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - + c := &mocks.TokenCache{} + c.OnGetTokenMatch().Return(nil, nil) interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f, p) + }, c, f, p) authenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return nil } @@ -201,33 +240,39 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - port := rand.IntnRange(10000, 60000) + httpPort := rand.IntnRange(10000, 60000) + grpcPort := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(&service.OAuth2MetadataResponse{ - AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", port), - TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", port), - JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", port), + AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", httpPort), + TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), + JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", httpPort), }, nil) m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(&service.PublicClientAuthConfigResponse{ Scopes: []string{"all"}, }, nil) - s := newAuthMetadataServer(t, port, m) + s := newAuthMetadataServer(t, grpcPort, httpPort, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() + c := &mocks.TokenCache{} + c.OnGetTokenMatch().Return(nil, nil) + c.OnTryLockMatch().Return(true) + c.OnSaveTokenMatch(mock.Anything).Return(nil) + c.OnPurgeIfEqualsMatch(mock.Anything).Return(true, nil) interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, &mocks.TokenCache{}, f, p) + }, c, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Aborted, "").Err() } @@ -239,17 +284,21 @@ func Test_newAuthInterceptor(t *testing.T) { } func TestMaterializeCredentials(t *testing.T) { - port := rand.IntnRange(10000, 60000) t.Run("No oauth2 metadata endpoint or Public client config lookup", func(t *testing.T) { + httpPort := rand.IntnRange(10000, 60000) + grpcPort := rand.IntnRange(10000, 60000) + c := &mocks.TokenCache{} + c.OnGetTokenMatch().Return(nil, nil) + c.OnSaveTokenMatch(mock.Anything).Return(nil) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get oauth2 metadata")) m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get public client config")) - s := newAuthMetadataServer(t, port, m) + s := newAuthMetadataServer(t, grpcPort, httpPort, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() @@ -259,24 +308,29 @@ func TestMaterializeCredentials(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), + TokenURL: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), Scopes: []string{"all"}, Audience: "http://localhost:30081", AuthorizationHeader: "authorization", - }, &mocks.TokenCache{}, f, p) + }, c, f, p) assert.NoError(t, err) }) t.Run("Failed to fetch client metadata", func(t *testing.T) { + httpPort := rand.IntnRange(10000, 60000) + grpcPort := rand.IntnRange(10000, 60000) + c := &mocks.TokenCache{} + c.OnGetTokenMatch().Return(nil, nil) + c.OnSaveTokenMatch(mock.Anything).Return(nil) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get oauth2 metadata")) failedPublicClientConfigLookup := errors.New("expected err") m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(nil, failedPublicClientConfigLookup) - s := newAuthMetadataServer(t, port, m) + s := newAuthMetadataServer(t, grpcPort, httpPort, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() @@ -286,9 +340,9 @@ func TestMaterializeCredentials(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), + TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", httpPort), Scopes: []string{"all"}, - }, &mocks.TokenCache{}, f, p) + }, c, f, p) assert.EqualError(t, err, "failed to fetch client metadata. Error: rpc error: code = Unknown desc = expected err") }) } diff --git a/flyteidl/clients/go/admin/cache/mocks/token_cache.go b/flyteidl/clients/go/admin/cache/mocks/token_cache.go index 0af58b381f8..88a1bef81cd 100644 --- a/flyteidl/clients/go/admin/cache/mocks/token_cache.go +++ b/flyteidl/clients/go/admin/cache/mocks/token_cache.go @@ -12,6 +12,16 @@ type TokenCache struct { mock.Mock } +// CondBroadcast provides a mock function with given fields: +func (_m *TokenCache) CondBroadcast() { + _m.Called() +} + +// CondWait provides a mock function with given fields: +func (_m *TokenCache) CondWait() { + _m.Called() +} + type TokenCache_GetToken struct { *mock.Call } @@ -53,6 +63,50 @@ func (_m *TokenCache) GetToken() (*oauth2.Token, error) { return r0, r1 } +// Lock provides a mock function with given fields: +func (_m *TokenCache) Lock() { + _m.Called() +} + +type TokenCache_PurgeIfEquals struct { + *mock.Call +} + +func (_m TokenCache_PurgeIfEquals) Return(_a0 bool, _a1 error) *TokenCache_PurgeIfEquals { + return &TokenCache_PurgeIfEquals{Call: _m.Call.Return(_a0, _a1)} +} + +func (_m *TokenCache) OnPurgeIfEquals(t *oauth2.Token) *TokenCache_PurgeIfEquals { + c_call := _m.On("PurgeIfEquals", t) + return &TokenCache_PurgeIfEquals{Call: c_call} +} + +func (_m *TokenCache) OnPurgeIfEqualsMatch(matchers ...interface{}) *TokenCache_PurgeIfEquals { + c_call := _m.On("PurgeIfEquals", matchers...) + return &TokenCache_PurgeIfEquals{Call: c_call} +} + +// PurgeIfEquals provides a mock function with given fields: t +func (_m *TokenCache) PurgeIfEquals(t *oauth2.Token) (bool, error) { + ret := _m.Called(t) + + var r0 bool + if rf, ok := ret.Get(0).(func(*oauth2.Token) bool); ok { + r0 = rf(t) + } else { + r0 = ret.Get(0).(bool) + } + + var r1 error + if rf, ok := ret.Get(1).(func(*oauth2.Token) error); ok { + r1 = rf(t) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type TokenCache_SaveToken struct { *mock.Call } @@ -84,3 +138,40 @@ func (_m *TokenCache) SaveToken(token *oauth2.Token) error { return r0 } + +type TokenCache_TryLock struct { + *mock.Call +} + +func (_m TokenCache_TryLock) Return(_a0 bool) *TokenCache_TryLock { + return &TokenCache_TryLock{Call: _m.Call.Return(_a0)} +} + +func (_m *TokenCache) OnTryLock() *TokenCache_TryLock { + c_call := _m.On("TryLock") + return &TokenCache_TryLock{Call: c_call} +} + +func (_m *TokenCache) OnTryLockMatch(matchers ...interface{}) *TokenCache_TryLock { + c_call := _m.On("TryLock", matchers...) + return &TokenCache_TryLock{Call: c_call} +} + +// TryLock provides a mock function with given fields: +func (_m *TokenCache) TryLock() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Unlock provides a mock function with given fields: +func (_m *TokenCache) Unlock() { + _m.Called() +} diff --git a/flyteidl/clients/go/admin/cache/token_cache.go b/flyteidl/clients/go/admin/cache/token_cache.go index e4e2b7e17f2..f946c8bdea6 100644 --- a/flyteidl/clients/go/admin/cache/token_cache.go +++ b/flyteidl/clients/go/admin/cache/token_cache.go @@ -1,14 +1,39 @@ package cache -import "golang.org/x/oauth2" +import ( + "fmt" + "golang.org/x/oauth2" +) //go:generate mockery -all -case=underscore +var ( + ErrNotFound = fmt.Errorf("secret not found in keyring") +) + // TokenCache defines the interface needed to cache and retrieve oauth tokens. type TokenCache interface { // SaveToken saves the token securely to cache. SaveToken(token *oauth2.Token) error - // Retrieves the token from the cache. + // GetToken retrieves the token from the cache. GetToken() (*oauth2.Token, error) + + // PurgeIfEquals purges the token from the cache. + PurgeIfEquals(t *oauth2.Token) (bool, error) + + // Lock the cache. + Lock() + + // TryLock tries to lock the cache. + TryLock() bool + + // Unlock the cache. + Unlock() + + // CondWait waits for the condition to be true. + CondWait() + + // CondSignalCondBroadcast signals the condition. + CondBroadcast() } diff --git a/flyteidl/clients/go/admin/cache/token_cache_inmemory.go b/flyteidl/clients/go/admin/cache/token_cache_inmemory.go index 9c6223fc06b..46e134ec555 100644 --- a/flyteidl/clients/go/admin/cache/token_cache_inmemory.go +++ b/flyteidl/clients/go/admin/cache/token_cache_inmemory.go @@ -2,23 +2,62 @@ package cache import ( "fmt" + "sync" + "sync/atomic" "golang.org/x/oauth2" ) type TokenCacheInMemoryProvider struct { - token *oauth2.Token + token atomic.Value + mu *sync.Mutex + cond *sync.Cond } func (t *TokenCacheInMemoryProvider) SaveToken(token *oauth2.Token) error { - t.token = token + t.token.Store(token) return nil } -func (t TokenCacheInMemoryProvider) GetToken() (*oauth2.Token, error) { - if t.token == nil { +func (t *TokenCacheInMemoryProvider) GetToken() (*oauth2.Token, error) { + tkn := t.token.Load() + if tkn == nil { return nil, fmt.Errorf("cannot find token in cache") } - return t.token, nil + return tkn.(*oauth2.Token), nil +} + +func (t *TokenCacheInMemoryProvider) PurgeIfEquals(existing *oauth2.Token) (bool, error) { + return t.token.CompareAndSwap(existing, nil), nil +} + +func (t *TokenCacheInMemoryProvider) Lock() { + t.mu.Lock() +} + +func (t *TokenCacheInMemoryProvider) TryLock() bool { + return t.mu.TryLock() +} + +func (t *TokenCacheInMemoryProvider) Unlock() { + t.mu.Unlock() +} + +// CondWait waits for the condition to be true. +func (t *TokenCacheInMemoryProvider) CondWait() { + t.cond.Wait() +} + +// CondBroadcast signals the condition. +func (t *TokenCacheInMemoryProvider) CondBroadcast() { + t.cond.Broadcast() +} + +func NewTokenCacheInMemoryProvider() *TokenCacheInMemoryProvider { + return &TokenCacheInMemoryProvider{ + mu: &sync.Mutex{}, + token: atomic.Value{}, + cond: sync.NewCond(&sync.Mutex{}), + } } diff --git a/flyteidl/clients/go/admin/client_builder.go b/flyteidl/clients/go/admin/client_builder.go index 25b263ecf1c..0d1341bf7b8 100644 --- a/flyteidl/clients/go/admin/client_builder.go +++ b/flyteidl/clients/go/admin/client_builder.go @@ -40,7 +40,7 @@ func (cb *ClientsetBuilder) WithDialOptions(opts ...grpc.DialOption) *ClientsetB // Build the clientset using the current state of the ClientsetBuilder func (cb *ClientsetBuilder) Build(ctx context.Context) (*Clientset, error) { if cb.tokenCache == nil { - cb.tokenCache = &cache.TokenCacheInMemoryProvider{} + cb.tokenCache = cache.NewTokenCacheInMemoryProvider() } if cb.config == nil { diff --git a/flyteidl/clients/go/admin/client_builder_test.go b/flyteidl/clients/go/admin/client_builder_test.go index c871bcb326d..89bcc385505 100644 --- a/flyteidl/clients/go/admin/client_builder_test.go +++ b/flyteidl/clients/go/admin/client_builder_test.go @@ -17,9 +17,9 @@ func TestClientsetBuilder_Build(t *testing.T) { cb := NewClientsetBuilder().WithConfig(&Config{ UseInsecureConnection: true, Endpoint: config.URL{URL: *u}, - }).WithTokenCache(&cache.TokenCacheInMemoryProvider{}) + }).WithTokenCache(cache.NewTokenCacheInMemoryProvider()) ctx := context.Background() _, err := cb.Build(ctx) assert.NoError(t, err) - assert.True(t, reflect.TypeOf(cb.tokenCache) == reflect.TypeOf(&cache.TokenCacheInMemoryProvider{})) + assert.True(t, reflect.TypeOf(cb.tokenCache) == reflect.TypeOf(cache.NewTokenCacheInMemoryProvider())) } diff --git a/flyteidl/clients/go/admin/client_test.go b/flyteidl/clients/go/admin/client_test.go index eb19b76f471..042a8266921 100644 --- a/flyteidl/clients/go/admin/client_test.go +++ b/flyteidl/clients/go/admin/client_test.go @@ -255,6 +255,8 @@ func TestGetAuthenticationDialOptionPkce(t *testing.T) { mockAuthClient := new(mocks.AuthMetadataServiceClient) mockTokenCache.OnGetTokenMatch().Return(&tokenData, nil) mockTokenCache.OnSaveTokenMatch(mock.Anything).Return(nil) + mockTokenCache.On("Lock").Return() + mockTokenCache.On("Unlock").Return() mockAuthClient.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(metadata, nil) mockAuthClient.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(clientMetatadata, nil) tokenSourceProvider, err := NewTokenSourceProvider(ctx, adminServiceConfig, mockTokenCache, mockAuthClient) @@ -288,7 +290,7 @@ func Test_getPkceAuthTokenSource(t *testing.T) { assert.NoError(t, err) // populate the cache - tokenCache := &cache.TokenCacheInMemoryProvider{} + tokenCache := cache.NewTokenCacheInMemoryProvider() assert.NoError(t, tokenCache.SaveToken(&tokenData)) baseOrchestrator := tokenorchestrator.BaseTokenOrchestrator{ diff --git a/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go b/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go index 3ecbeb746e3..4edf8c83f6f 100644 --- a/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go +++ b/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go @@ -25,7 +25,7 @@ import ( func TestFetchFromAuthFlow(t *testing.T) { ctx := context.Background() t.Run("fetch from auth flow", func(t *testing.T) { - tokenCache := &cache.TokenCacheInMemoryProvider{} + tokenCache := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewDeviceFlowTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ @@ -98,7 +98,7 @@ func TestFetchFromAuthFlow(t *testing.T) { })) defer fakeServer.Close() - tokenCache := &cache.TokenCacheInMemoryProvider{} + tokenCache := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewDeviceFlowTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ diff --git a/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go b/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go index dc1c80f63ac..ca1973ea669 100644 --- a/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go +++ b/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go @@ -16,7 +16,7 @@ import ( func TestFetchFromAuthFlow(t *testing.T) { ctx := context.Background() t.Run("fetch from auth flow", func(t *testing.T) { - tokenCache := &cache.TokenCacheInMemoryProvider{} + tokenCache := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ diff --git a/flyteidl/clients/go/admin/token_source_provider.go b/flyteidl/clients/go/admin/token_source_provider.go index 44ed5048e24..eb6ecac1e53 100644 --- a/flyteidl/clients/go/admin/token_source_provider.go +++ b/flyteidl/clients/go/admin/token_source_provider.go @@ -191,7 +191,7 @@ func NewClientCredentialsTokenSourceProvider(ctx context.Context, cfg *Config, s } secret = strings.TrimSpace(secret) if tokenCache == nil { - tokenCache = &cache.TokenCacheInMemoryProvider{} + tokenCache = cache.NewTokenCacheInMemoryProvider() } return ClientCredentialsTokenSourceProvider{ ccConfig: clientcredentials.Config{ @@ -249,13 +249,15 @@ func (s *customTokenSource) Token() (*oauth2.Token, error) { return nil }) if err != nil { - return nil, err + logger.Warnf(s.ctx, "failed to get token: %v", err) + return nil, fmt.Errorf("failed to get token: %w", err) } + logger.Infof(s.ctx, "retrieved token with expiry %v", token.Expiry) err = s.tokenCache.SaveToken(token) if err != nil { - logger.Warnf(s.ctx, "failed to cache token: %w", err) + logger.Warnf(s.ctx, "failed to cache token: %v", err) } return token, nil diff --git a/flyteidl/clients/go/admin/token_source_provider_test.go b/flyteidl/clients/go/admin/token_source_provider_test.go index 63fc1aa56ec..43d0fdd9280 100644 --- a/flyteidl/clients/go/admin/token_source_provider_test.go +++ b/flyteidl/clients/go/admin/token_source_provider_test.go @@ -127,7 +127,9 @@ func TestCustomTokenSource_Token(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { tokenCache := &tokenCacheMocks.TokenCache{} - tokenCache.OnGetToken().Return(test.token, nil).Once() + tokenCache.OnGetToken().Return(test.token, nil).Maybe() + tokenCache.On("Lock").Return().Maybe() + tokenCache.On("Unlock").Return().Maybe() provider, err := NewClientCredentialsTokenSourceProvider(ctx, cfg, []string{}, "", tokenCache, "") assert.NoError(t, err) source, err := provider.GetTokenSource(ctx) diff --git a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go index 6fddeee67e4..9285a82ac94 100644 --- a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go +++ b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go @@ -3,8 +3,6 @@ package tokenorchestrator import ( "context" "fmt" - "time" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" "golang.org/x/oauth2" @@ -54,22 +52,29 @@ func (t BaseTokenOrchestrator) FetchTokenFromCacheOrRefreshIt(ctx context.Contex return nil, err } - if !token.Valid() { - return nil, fmt.Errorf("token from cache is invalid") + if token.Valid() { + return token, nil + } + + t.TokenCache.Lock() + defer t.TokenCache.Unlock() + + token, err = t.TokenCache.GetToken() + if err != nil { + return nil, err } - // If token doesn't need to be refreshed, return it. - if time.Now().Before(token.Expiry.Add(-tokenRefreshGracePeriod.Duration)) { - logger.Infof(ctx, "found the token in the cache") + if token.Valid() { return token, nil } - token.Expiry = token.Expiry.Add(-tokenRefreshGracePeriod.Duration) token, err = t.RefreshToken(ctx, token) if err != nil { return nil, fmt.Errorf("failed to refresh token using cached token. Error: %w", err) } + token.Expiry = token.Expiry.Add(-tokenRefreshGracePeriod.Duration) + if !token.Valid() { return nil, fmt.Errorf("refreshed token is invalid") } diff --git a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go index ed4afa0ff05..0a1a9f4985a 100644 --- a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go +++ b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go @@ -26,7 +26,7 @@ func TestRefreshTheToken(t *testing.T) { ClientID: "dummyClient", }, } - tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} + tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() orchestrator := BaseTokenOrchestrator{ ClientConfig: clientConf, TokenCache: tokenCacheProvider, @@ -58,7 +58,7 @@ func TestFetchFromCache(t *testing.T) { mockAuthClient.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(clientMetatadata, nil) t.Run("no token in cache", func(t *testing.T) { - tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} + tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) @@ -69,7 +69,7 @@ func TestFetchFromCache(t *testing.T) { }) t.Run("token in cache", func(t *testing.T) { - tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} + tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) assert.NoError(t, err) fileData, _ := os.ReadFile("testdata/token.json") @@ -86,7 +86,7 @@ func TestFetchFromCache(t *testing.T) { }) t.Run("expired token in cache", func(t *testing.T) { - tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} + tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) assert.NoError(t, err) fileData, _ := os.ReadFile("testdata/token.json") diff --git a/flyteidl/clients/go/assets/admin.swagger.json b/flyteidl/clients/go/assets/admin.swagger.json index e60790b729d..7114cbcaf1e 100644 --- a/flyteidl/clients/go/assets/admin.swagger.json +++ b/flyteidl/clients/go/assets/admin.swagger.json @@ -12389,6 +12389,19 @@ }, "description": "This configuration allows executing raw containers in Flyte using the Flyte CoPilot system.\nFlyte CoPilot, eliminates the needs of flytekit or sdk inside the container. Any inputs required by the users container are side-loaded in the input_path\nAny outputs generated by the user container - within output_path are automatically uploaded." }, + "coreEnumType": { + "type": "object", + "properties": { + "values": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Predefined set of enum values." + } + }, + "description": "Enables declaring enum types, with predefined string values\nFor len(values) \u003e 0, the first value in the ordered list is regarded as the default value. If you wish\nTo provide no defaults, make the first value as undefined." + }, "coreError": { "type": "object", "properties": { @@ -12793,7 +12806,7 @@ "description": "A blob might have specialized implementation details depending on associated metadata." }, "enum_type": { - "$ref": "#/definitions/flyteidlcoreEnumType", + "$ref": "#/definitions/coreEnumType", "description": "Defines an enum with pre-defined string values." }, "structured_dataset_type": { @@ -14313,19 +14326,6 @@ }, "title": "Metadata for a WorkflowNode" }, - "flyteidlcoreEnumType": { - "type": "object", - "properties": { - "values": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Predefined set of enum values." - } - }, - "description": "Enables declaring enum types, with predefined string values\nFor len(values) \u003e 0, the first value in the ordered list is regarded as the default value. If you wish\nTo provide no defaults, make the first value as undefined." - }, "flyteidlcoreKeyValuePair": { "type": "object", "properties": { @@ -14469,11 +14469,11 @@ "properties": { "@type": { "type": "string", - "description": "A URL/resource name that uniquely identifies the type of the serialized\nprotocol buffer message. This string must contain at least\none \"/\" character. The last segment of the URL's path must represent\nthe fully qualified name of the type (as in\n`path/google.protobuf.Duration`). The name should be in a canonical form\n(e.g., leading \".\" is not accepted).\n\nIn practice, teams usually precompile into the binary all types that they\nexpect it to use in the context of Any. However, for URLs which use the\nscheme `http`, `https`, or no scheme, one can optionally set up a type\nserver that maps type URLs to message definitions as follows:\n\n* If no scheme is provided, `https` is assumed.\n* An HTTP GET on the URL must yield a [google.protobuf.Type][]\n value in binary format, or produce an error.\n* Applications are allowed to cache lookup results based on the\n URL, or have them precompiled into a binary to avoid any\n lookup. Therefore, binary compatibility needs to be preserved\n on changes to types. (Use versioned type names to manage\n breaking changes.)\n\nNote: this functionality is not currently available in the official\nprotobuf release, and it is not used for type URLs beginning with\ntype.googleapis.com. As of May 2023, there are no widely used type server\nimplementations and no plans to implement one.\n\nSchemes other than `http`, `https` (or the empty scheme) might be\nused with implementation specific semantics." + "description": "A URL/resource name that uniquely identifies the type of the serialized\nprotocol buffer message. This string must contain at least\none \"/\" character. The last segment of the URL's path must represent\nthe fully qualified name of the type (as in\n`path/google.protobuf.Duration`). The name should be in a canonical form\n(e.g., leading \".\" is not accepted).\n\nIn practice, teams usually precompile into the binary all types that they\nexpect it to use in the context of Any. However, for URLs which use the\nscheme `http`, `https`, or no scheme, one can optionally set up a type\nserver that maps type URLs to message definitions as follows:\n\n* If no scheme is provided, `https` is assumed.\n* An HTTP GET on the URL must yield a [google.protobuf.Type][]\n value in binary format, or produce an error.\n* Applications are allowed to cache lookup results based on the\n URL, or have them precompiled into a binary to avoid any\n lookup. Therefore, binary compatibility needs to be preserved\n on changes to types. (Use versioned type names to manage\n breaking changes.)\n\nNote: this functionality is not currently available in the official\nprotobuf release, and it is not used for type URLs beginning with\ntype.googleapis.com.\n\nSchemes other than `http`, `https` (or the empty scheme) might be\nused with implementation specific semantics." } }, "additionalProperties": {}, - "description": "`Any` contains an arbitrary serialized protocol buffer message along with a\nURL that describes the type of the serialized message.\n\nProtobuf library provides support to pack/unpack Any values in the form\nof utility functions or additional generated methods of the Any type.\n\nExample 1: Pack and unpack a message in C++.\n\n Foo foo = ...;\n Any any;\n any.PackFrom(foo);\n ...\n if (any.UnpackTo(\u0026foo)) {\n ...\n }\n\nExample 2: Pack and unpack a message in Java.\n\n Foo foo = ...;\n Any any = Any.pack(foo);\n ...\n if (any.is(Foo.class)) {\n foo = any.unpack(Foo.class);\n }\n // or ...\n if (any.isSameTypeAs(Foo.getDefaultInstance())) {\n foo = any.unpack(Foo.getDefaultInstance());\n }\n\n Example 3: Pack and unpack a message in Python.\n\n foo = Foo(...)\n any = Any()\n any.Pack(foo)\n ...\n if any.Is(Foo.DESCRIPTOR):\n any.Unpack(foo)\n ...\n\n Example 4: Pack and unpack a message in Go\n\n foo := \u0026pb.Foo{...}\n any, err := anypb.New(foo)\n if err != nil {\n ...\n }\n ...\n foo := \u0026pb.Foo{}\n if err := any.UnmarshalTo(foo); err != nil {\n ...\n }\n\nThe pack methods provided by protobuf library will by default use\n'type.googleapis.com/full.type.name' as the type URL and the unpack\nmethods only use the fully qualified type name after the last '/'\nin the type URL, for example \"foo.bar.com/x/y.z\" will yield type\nname \"y.z\".\n\nJSON\n====\nThe JSON representation of an `Any` value uses the regular\nrepresentation of the deserialized, embedded message, with an\nadditional field `@type` which contains the type URL. Example:\n\n package google.profile;\n message Person {\n string first_name = 1;\n string last_name = 2;\n }\n\n {\n \"@type\": \"type.googleapis.com/google.profile.Person\",\n \"firstName\": \u003cstring\u003e,\n \"lastName\": \u003cstring\u003e\n }\n\nIf the embedded message type is well-known and has a custom JSON\nrepresentation, that representation will be embedded adding a field\n`value` which holds the custom JSON in addition to the `@type`\nfield. Example (for message [google.protobuf.Duration][]):\n\n {\n \"@type\": \"type.googleapis.com/google.protobuf.Duration\",\n \"value\": \"1.212s\"\n }" + "description": "`Any` contains an arbitrary serialized protocol buffer message along with a\nURL that describes the type of the serialized message.\n\nProtobuf library provides support to pack/unpack Any values in the form\nof utility functions or additional generated methods of the Any type.\n\nExample 1: Pack and unpack a message in C++.\n\n Foo foo = ...;\n Any any;\n any.PackFrom(foo);\n ...\n if (any.UnpackTo(\u0026foo)) {\n ...\n }\n\nExample 2: Pack and unpack a message in Java.\n\n Foo foo = ...;\n Any any = Any.pack(foo);\n ...\n if (any.is(Foo.class)) {\n foo = any.unpack(Foo.class);\n }\n // or ...\n if (any.isSameTypeAs(Foo.getDefaultInstance())) {\n foo = any.unpack(Foo.getDefaultInstance());\n }\n\nExample 3: Pack and unpack a message in Python.\n\n foo = Foo(...)\n any = Any()\n any.Pack(foo)\n ...\n if any.Is(Foo.DESCRIPTOR):\n any.Unpack(foo)\n ...\n\nExample 4: Pack and unpack a message in Go\n\n foo := \u0026pb.Foo{...}\n any, err := anypb.New(foo)\n if err != nil {\n ...\n }\n ...\n foo := \u0026pb.Foo{}\n if err := any.UnmarshalTo(foo); err != nil {\n ...\n }\n\nThe pack methods provided by protobuf library will by default use\n'type.googleapis.com/full.type.name' as the type URL and the unpack\nmethods only use the fully qualified type name after the last '/'\nin the type URL, for example \"foo.bar.com/x/y.z\" will yield type\nname \"y.z\".\n\nJSON\n\nThe JSON representation of an `Any` value uses the regular\nrepresentation of the deserialized, embedded message, with an\nadditional field `@type` which contains the type URL. Example:\n\n package google.profile;\n message Person {\n string first_name = 1;\n string last_name = 2;\n }\n\n {\n \"@type\": \"type.googleapis.com/google.profile.Person\",\n \"firstName\": \u003cstring\u003e,\n \"lastName\": \u003cstring\u003e\n }\n\nIf the embedded message type is well-known and has a custom JSON\nrepresentation, that representation will be embedded adding a field\n`value` which holds the custom JSON in addition to the `@type`\nfield. Example (for message [google.protobuf.Duration][]):\n\n {\n \"@type\": \"type.googleapis.com/google.protobuf.Duration\",\n \"value\": \"1.212s\"\n }" }, "protobufNullValue": { "type": "string", @@ -14481,7 +14481,7 @@ "NULL_VALUE" ], "default": "NULL_VALUE", - "description": "`NullValue` is a singleton enumeration to represent the null value for the\n`Value` type union.\n\nThe JSON representation for `NullValue` is JSON `null`.\n\n - NULL_VALUE: Null value." + "description": "`NullValue` is a singleton enumeration to represent the null value for the\n`Value` type union.\n\n The JSON representation for `NullValue` is JSON `null`.\n\n - NULL_VALUE: Null value." }, "serviceAdminServiceCreateTaskBody": { "type": "object", diff --git a/flyteidl/go.mod b/flyteidl/go.mod index 1389920eb3f..c6c39b7ecae 100644 --- a/flyteidl/go.mod +++ b/flyteidl/go.mod @@ -47,7 +47,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/flyteidl/go.sum b/flyteidl/go.sum index 29545665484..856319bed6e 100644 --- a/flyteidl/go.sum +++ b/flyteidl/go.sum @@ -58,8 +58,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 35c768fbd8d..9e110c583dd 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -72,7 +72,7 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index a4218686bdf..f6bbc892d22 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -141,8 +141,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 04e5455c074..9d9eb637729 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -84,7 +84,7 @@ require ( github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index c39cf98d503..2525211e6bc 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -151,8 +151,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytestdlib/go.mod b/flytestdlib/go.mod index 9d1d7c491f3..067fd8ec2e5 100644 --- a/flytestdlib/go.mod +++ b/flytestdlib/go.mod @@ -9,7 +9,7 @@ require ( github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 github.com/fatih/color v1.13.0 github.com/fatih/structtag v1.2.0 - github.com/flyteorg/stow v0.3.8 + github.com/flyteorg/stow v0.3.10 github.com/fsnotify/fsnotify v1.6.0 github.com/ghodss/yaml v1.0.0 github.com/go-gormigrate/gormigrate/v2 v2.1.1 diff --git a/flytestdlib/go.sum b/flytestdlib/go.sum index 4ea3652f9d6..3a0c13475b7 100644 --- a/flytestdlib/go.sum +++ b/flytestdlib/go.sum @@ -115,8 +115,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index d10b706ac7b..2d23059972b 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -357,7 +357,7 @@ func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference return SignedURLResponse{}, err } - urlStr, err := c.PreSignRequest(ctx, properties.Scope, key, stow.PresignRequestParams{ + resp, err := c.PreSignRequest(ctx, properties.Scope, key, stow.PresignRequestParams{ ExpiresIn: properties.ExpiresIn, ContentMD5: properties.ContentMD5, }) @@ -366,7 +366,7 @@ func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference return SignedURLResponse{}, err } - urlVal, err := url.Parse(urlStr) + urlVal, err := url.Parse(resp.Url) if err != nil { return SignedURLResponse{}, err } diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index d888977c577..c66ecfad4eb 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -53,8 +53,8 @@ type mockStowContainer struct { // CreateSignedURL creates a signed url with the provided properties. func (m mockStowContainer) PreSignRequest(_ context.Context, _ stow.ClientMethod, s string, - _ stow.PresignRequestParams) (url string, err error) { - return s, nil + _ stow.PresignRequestParams) (urlResp stow.PresignResponse, err error) { + return stow.PresignResponse{Url: s}, nil } func (m mockStowContainer) ID() string { diff --git a/go.mod b/go.mod index ed927351362..40daf6cfeaa 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/flyteorg/flyte/flyteidl v1.10.7-b0.0.20240109185447-d86f91995c44 // indirect github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375 // indirect - github.com/flyteorg/stow v0.3.8 // indirect + github.com/flyteorg/stow v0.3.10 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344 // indirect github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect diff --git a/go.sum b/go.sum index f122afa2a44..bb236557ea6 100644 --- a/go.sum +++ b/go.sum @@ -284,8 +284,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= -github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= +github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=