diff --git a/cacheservice/go.mod b/cacheservice/go.mod index c7e8a7625ab..d5676ba50e0 100644 --- a/cacheservice/go.mod +++ b/cacheservice/go.mod @@ -81,7 +81,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/cacheservice/go.sum b/cacheservice/go.sum index dcd2862f732..b7e2b841b9f 100644 --- a/cacheservice/go.sum +++ b/cacheservice/go.sum @@ -149,8 +149,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/datacatalog/go.mod b/datacatalog/go.mod index 56b004fd744..a23ceb162bd 100644 --- a/datacatalog/go.mod +++ b/datacatalog/go.mod @@ -44,7 +44,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect diff --git a/datacatalog/go.sum b/datacatalog/go.sum index 6ab73acf0d1..5a1c875f859 100644 --- a/datacatalog/go.sum +++ b/datacatalog/go.sum @@ -113,8 +113,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/fasttask/plugin/go.mod b/fasttask/plugin/go.mod index 441ad8cc095..2484572ccef 100644 --- a/fasttask/plugin/go.mod +++ b/fasttask/plugin/go.mod @@ -43,7 +43,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/fasttask/plugin/go.sum b/fasttask/plugin/go.sum index affebdbb635..108f2187b71 100644 --- a/fasttask/plugin/go.sum +++ b/fasttask/plugin/go.sum @@ -62,8 +62,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index f94fb29acc7..6edf3ad38b4 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -173,10 +173,12 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error { // if taskStatus is complete then enqueueOwner for fast feedback phase := core.Phase(taskStatus.GetPhase()) if phase == core.PhaseSuccess || phase == core.PhaseRetryableFailure { - if err := f.enqueueOwner(types.NamespacedName{ + ownerID := types.NamespacedName{ Namespace: taskStatus.GetNamespace(), Name: taskStatus.GetWorkflowId(), - }); err != nil { + } + + if err := f.enqueueOwner(ownerID); err != nil { logger.Warnf(context.Background(), "failed to enqueue owner for task %s: %+v", taskStatus.GetTaskId(), err) } } diff --git a/flyteadmin/go.mod b/flyteadmin/go.mod index 7c1f3f14edd..3632bb22440 100644 --- a/flyteadmin/go.mod +++ b/flyteadmin/go.mod @@ -17,7 +17,7 @@ require ( github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375 github.com/flyteorg/flyte/flytepropeller v1.10.7-0.20240104005944-64548962d375 github.com/flyteorg/flyte/flytestdlib v1.10.7-0.20240104005944-64548962d375 - github.com/flyteorg/stow v0.3.10 + github.com/flyteorg/stow v0.3.8 github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344 github.com/go-gormigrate/gormigrate/v2 v2.1.1 github.com/golang-jwt/jwt/v4 v4.5.0 diff --git a/flyteadmin/go.sum b/flyteadmin/go.sum index 6f7c9ccce75..473db1d4bab 100644 --- a/flyteadmin/go.sum +++ b/flyteadmin/go.sum @@ -246,8 +246,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= diff --git a/flytecopilot/go.mod b/flytecopilot/go.mod index 21c3c1f12cd..4832ec9177d 100644 --- a/flytecopilot/go.mod +++ b/flytecopilot/go.mod @@ -40,7 +40,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/flytecopilot/go.sum b/flytecopilot/go.sum index f18b1731264..eaee1f12ac2 100644 --- a/flytecopilot/go.sum +++ b/flytecopilot/go.sum @@ -103,8 +103,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flyteidl/clients/go/admin/auth_interceptor.go b/flyteidl/clients/go/admin/auth_interceptor.go index a5cdeab4ef7..09391df67ae 100644 --- a/flyteidl/clients/go/admin/auth_interceptor.go +++ b/flyteidl/clients/go/admin/auth_interceptor.go @@ -6,23 +6,22 @@ import ( "fmt" "net/http" + "github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" + "github.com/flyteorg/flyte/flytestdlib/logger" "golang.org/x/oauth2" - "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache" - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" - "github.com/flyteorg/flyte/flytestdlib/logger" + "google.golang.org/grpc" ) const ProxyAuthorizationHeader = "proxy-authorization" // MaterializeCredentials will attempt to build a TokenSource given the anonymously available information exposed by the server. // Once established, it'll invoke PerRPCCredentialsFuture.Store() on perRPCCredentials to populate it with the appropriate values. -func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, - perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error { - +func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.TokenCache, perRPCCredentials *PerRPCCredentialsFuture, proxyCredentialsFuture *PerRPCCredentialsFuture) error { authMetadataClient, err := InitializeAuthMetadataClient(ctx, cfg, proxyCredentialsFuture) if err != nil { return fmt.Errorf("failed to initialized Auth Metadata Client. Error: %w", err) @@ -44,17 +43,11 @@ func MaterializeCredentials(ctx context.Context, cfg *Config, tokenCache cache.T tokenSource, err := tokenSourceProvider.GetTokenSource(ctx) if err != nil { - return fmt.Errorf("failed to get token source. Error: %w", err) - } - - _, err = tokenSource.Token() - if err != nil { - return fmt.Errorf("failed to issue token. Error: %w", err) + return err } wrappedTokenSource := NewCustomHeaderTokenSource(tokenSource, cfg.UseInsecureConnection, authorizationMetadataKey) perRPCCredentials.Store(wrappedTokenSource) - return nil } @@ -142,15 +135,6 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = setHTTPClientContext(ctx, cfg, proxyCredentialsFuture) - // If there is already a token in the cache (e.g. key-ring), we should use it immediately... - t, _ := tokenCache.GetToken() - if t != nil { - err := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) - if err != nil { - return fmt.Errorf("failed to materialize credentials. Error: %v", err) - } - } - err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { logger.Debugf(ctx, "Request failed due to [%v]. If it's an unauthenticated error, we will attempt to establish an authenticated context.", err) @@ -158,33 +142,10 @@ func NewAuthInterceptor(cfg *Config, tokenCache cache.TokenCache, credentialsFut if st, ok := status.FromError(err); ok { // If the error we receive from executing the request expects if shouldAttemptToAuthenticate(st.Code()) { - err = func() error { - if !tokenCache.TryLock() { - tokenCache.CondWait() - return nil - } - - defer tokenCache.Unlock() - _, err := tokenCache.PurgeIfEquals(t) - if err != nil && !errors.Is(err, cache.ErrNotFound) { - logger.Errorf(ctx, "Failed to purge cache. Error [%v]", err) - return fmt.Errorf("failed to purge cache. Error: %w", err) - } - - logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) - newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) - if newErr != nil { - errString := fmt.Sprintf("authentication error! Original Error: %v, Auth Error: %v", err, newErr) - logger.Errorf(ctx, errString) - return fmt.Errorf(errString) - } - - tokenCache.CondBroadcast() - return nil - }() - - if err != nil { - return err + logger.Debugf(ctx, "Request failed due to [%v]. Attempting to establish an authenticated connection and trying again.", st.Code()) + newErr := MaterializeCredentials(ctx, cfg, tokenCache, credentialsFuture, proxyCredentialsFuture) + if newErr != nil { + return fmt.Errorf("authentication error! Original Error: %v, Auth Error: %w", err, newErr) } return invoker(ctx, method, req, reply, cc, opts...) @@ -207,7 +168,6 @@ func NewProxyAuthInterceptor(cfg *Config, proxyCredentialsFuture *PerRPCCredenti } return invoker(ctx, method, req, reply, cc, opts...) } - return err } } diff --git a/flyteidl/clients/go/admin/auth_interceptor_test.go b/flyteidl/clients/go/admin/auth_interceptor_test.go index 10c96625b78..ce99c992708 100644 --- a/flyteidl/clients/go/admin/auth_interceptor_test.go +++ b/flyteidl/clients/go/admin/auth_interceptor_test.go @@ -2,14 +2,13 @@ package admin import ( "context" - "encoding/json" "errors" "fmt" "io" "net" "net/http" + "net/http/httptest" "net/url" - "os" "strings" "sync" "testing" @@ -32,11 +31,10 @@ import ( // authMetadataServer is a fake AuthMetadataServer that takes in an AuthMetadataServer implementation (usually one // initialized through mockery) and starts a local server that uses it to respond to grpc requests. type authMetadataServer struct { + s *httptest.Server t testing.TB - grpcPort int - httpPort int + port int grpcServer *grpc.Server - httpServer *http.Server netListener net.Listener impl service.AuthMetadataServiceServer lck *sync.RWMutex @@ -72,49 +70,27 @@ func (s authMetadataServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) } -func (s *authMetadataServer) tokenHandler(w http.ResponseWriter, r *http.Request) { - tokenJSON := []byte(`{"access_token": "exampletoken", "token_type": "bearer"}`) - w.Header().Set("Content-Type", "application/json") - _, err := w.Write(tokenJSON) - assert.NoError(s.t, err) -} - func (s *authMetadataServer) Start(_ context.Context) error { s.lck.Lock() defer s.lck.Unlock() /***** Set up the server serving channelz service. *****/ - - lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", s.grpcPort)) + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", s.port)) if err != nil { - return fmt.Errorf("failed to listen on port [%v]: %w", s.grpcPort, err) + return fmt.Errorf("failed to listen on port [%v]: %w", s.port, err) } - s.netListener = lis grpcS := grpc.NewServer() service.RegisterAuthMetadataServiceServer(grpcS, s) go func() { - defer grpcS.Stop() _ = grpcS.Serve(lis) + //assert.NoError(s.t, err) }() + s.grpcServer = grpcS - mux := http.NewServeMux() - // Attach the handler to the /oauth2/token path - mux.HandleFunc("/oauth2/token", s.tokenHandler) - - //nolint:gosec - s.httpServer = &http.Server{ - Addr: fmt.Sprintf("localhost:%d", s.httpPort), - Handler: mux, - } + s.netListener = lis - go func() { - defer s.httpServer.Close() - err := s.httpServer.ListenAndServe() - if err != nil { - panic(err) - } - }() + s.s = httptest.NewServer(s) return nil } @@ -122,30 +98,25 @@ func (s *authMetadataServer) Start(_ context.Context) error { func (s *authMetadataServer) Close() { s.lck.RLock() defer s.lck.RUnlock() + s.grpcServer.Stop() + s.s.Close() } -func newAuthMetadataServer(t testing.TB, grpcPort int, httpPort int, impl service.AuthMetadataServiceServer) *authMetadataServer { +func newAuthMetadataServer(t testing.TB, port int, impl service.AuthMetadataServiceServer) *authMetadataServer { return &authMetadataServer{ - grpcPort: grpcPort, - httpPort: httpPort, - t: t, - impl: impl, - lck: &sync.RWMutex{}, + port: port, + t: t, + impl: impl, + lck: &sync.RWMutex{}, } } func Test_newAuthInterceptor(t *testing.T) { - plan, _ := os.ReadFile("tokenorchestrator/testdata/token.json") - var tokenData oauth2.Token - err := json.Unmarshal(plan, &tokenData) - assert.NoError(t, err) t.Run("Other Error", func(t *testing.T) { f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - mockTokenCache := &mocks.TokenCache{} - mockTokenCache.OnGetTokenMatch().Return(&tokenData, nil) - interceptor := NewAuthInterceptor(&Config{}, mockTokenCache, f, p) + interceptor := NewAuthInterceptor(&Config{}, &mocks.TokenCache{}, f, p) otherError := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Canceled, "").Err() } @@ -158,43 +129,35 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - httpPort := rand.IntnRange(10000, 60000) - grpcPort := rand.IntnRange(10000, 60000) + port := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(&service.OAuth2MetadataResponse{ - AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", httpPort), - TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), - JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", httpPort), + AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", port), + TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", port), + JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", port), }, nil) - m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(&service.PublicClientAuthConfigResponse{ Scopes: []string{"all"}, }, nil) - s := newAuthMetadataServer(t, grpcPort, httpPort, m) + s := newAuthMetadataServer(t, port, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - c := &mocks.TokenCache{} - c.OnGetTokenMatch().Return(nil, nil) - c.OnTryLockMatch().Return(true) - c.OnSaveTokenMatch(mock.Anything).Return(nil) - c.On("CondBroadcast").Return() - c.On("Unlock").Return() - c.OnPurgeIfEqualsMatch(mock.Anything).Return(true, nil) interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, c, f, p) + }, &mocks.TokenCache{}, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Unauthenticated, "").Err() } + err = interceptor(ctx, "POST", nil, nil, nil, unauthenticated) assert.Error(t, err) assert.Truef(t, f.IsInitialized(), "PerRPCCredentialFuture should be initialized") @@ -206,26 +169,24 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - httpPort := rand.IntnRange(10000, 60000) - grpcPort := rand.IntnRange(10000, 60000) + port := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} - s := newAuthMetadataServer(t, grpcPort, httpPort, m) + s := newAuthMetadataServer(t, port, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - c := &mocks.TokenCache{} - c.OnGetTokenMatch().Return(nil, nil) + interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, c, f, p) + }, &mocks.TokenCache{}, f, p) authenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return nil } @@ -240,39 +201,33 @@ func Test_newAuthInterceptor(t *testing.T) { Level: logger.DebugLevel, })) - httpPort := rand.IntnRange(10000, 60000) - grpcPort := rand.IntnRange(10000, 60000) + port := rand.IntnRange(10000, 60000) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(&service.OAuth2MetadataResponse{ - AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", httpPort), - TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), - JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", httpPort), + AuthorizationEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/authorize", port), + TokenEndpoint: fmt.Sprintf("http://localhost:%d/oauth2/token", port), + JwksUri: fmt.Sprintf("http://localhost:%d/oauth2/jwks", port), }, nil) m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(&service.PublicClientAuthConfigResponse{ Scopes: []string{"all"}, }, nil) - s := newAuthMetadataServer(t, grpcPort, httpPort, m) + s := newAuthMetadataServer(t, port, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() p := NewPerRPCCredentialsFuture() - c := &mocks.TokenCache{} - c.OnGetTokenMatch().Return(nil, nil) - c.OnTryLockMatch().Return(true) - c.OnSaveTokenMatch(mock.Anything).Return(nil) - c.OnPurgeIfEqualsMatch(mock.Anything).Return(true, nil) interceptor := NewAuthInterceptor(&Config{ Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - }, c, f, p) + }, &mocks.TokenCache{}, f, p) unauthenticated := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { return status.New(codes.Aborted, "").Err() } @@ -284,21 +239,17 @@ func Test_newAuthInterceptor(t *testing.T) { } func TestMaterializeCredentials(t *testing.T) { + port := rand.IntnRange(10000, 60000) t.Run("No oauth2 metadata endpoint or Public client config lookup", func(t *testing.T) { - httpPort := rand.IntnRange(10000, 60000) - grpcPort := rand.IntnRange(10000, 60000) - c := &mocks.TokenCache{} - c.OnGetTokenMatch().Return(nil, nil) - c.OnSaveTokenMatch(mock.Anything).Return(nil) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get oauth2 metadata")) m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get public client config")) - s := newAuthMetadataServer(t, grpcPort, httpPort, m) + s := newAuthMetadataServer(t, port, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() @@ -308,29 +259,24 @@ func TestMaterializeCredentials(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - TokenURL: fmt.Sprintf("http://localhost:%d/oauth2/token", httpPort), + TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), Scopes: []string{"all"}, Audience: "http://localhost:30081", AuthorizationHeader: "authorization", - }, c, f, p) + }, &mocks.TokenCache{}, f, p) assert.NoError(t, err) }) t.Run("Failed to fetch client metadata", func(t *testing.T) { - httpPort := rand.IntnRange(10000, 60000) - grpcPort := rand.IntnRange(10000, 60000) - c := &mocks.TokenCache{} - c.OnGetTokenMatch().Return(nil, nil) - c.OnSaveTokenMatch(mock.Anything).Return(nil) m := &adminMocks.AuthMetadataServiceServer{} m.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(nil, errors.New("unexpected call to get oauth2 metadata")) failedPublicClientConfigLookup := errors.New("expected err") m.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(nil, failedPublicClientConfigLookup) - s := newAuthMetadataServer(t, grpcPort, httpPort, m) + s := newAuthMetadataServer(t, port, m) ctx := context.Background() assert.NoError(t, s.Start(ctx)) defer s.Close() - u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", grpcPort)) + u, err := url.Parse(fmt.Sprintf("dns:///localhost:%d", port)) assert.NoError(t, err) f := NewPerRPCCredentialsFuture() @@ -340,9 +286,9 @@ func TestMaterializeCredentials(t *testing.T) { Endpoint: config.URL{URL: *u}, UseInsecureConnection: true, AuthType: AuthTypeClientSecret, - TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", httpPort), + TokenURL: fmt.Sprintf("http://localhost:%d/api/v1/token", port), Scopes: []string{"all"}, - }, c, f, p) + }, &mocks.TokenCache{}, f, p) assert.EqualError(t, err, "failed to fetch client metadata. Error: rpc error: code = Unknown desc = expected err") }) } diff --git a/flyteidl/clients/go/admin/cache/mocks/token_cache.go b/flyteidl/clients/go/admin/cache/mocks/token_cache.go index 88a1bef81cd..0af58b381f8 100644 --- a/flyteidl/clients/go/admin/cache/mocks/token_cache.go +++ b/flyteidl/clients/go/admin/cache/mocks/token_cache.go @@ -12,16 +12,6 @@ type TokenCache struct { mock.Mock } -// CondBroadcast provides a mock function with given fields: -func (_m *TokenCache) CondBroadcast() { - _m.Called() -} - -// CondWait provides a mock function with given fields: -func (_m *TokenCache) CondWait() { - _m.Called() -} - type TokenCache_GetToken struct { *mock.Call } @@ -63,50 +53,6 @@ func (_m *TokenCache) GetToken() (*oauth2.Token, error) { return r0, r1 } -// Lock provides a mock function with given fields: -func (_m *TokenCache) Lock() { - _m.Called() -} - -type TokenCache_PurgeIfEquals struct { - *mock.Call -} - -func (_m TokenCache_PurgeIfEquals) Return(_a0 bool, _a1 error) *TokenCache_PurgeIfEquals { - return &TokenCache_PurgeIfEquals{Call: _m.Call.Return(_a0, _a1)} -} - -func (_m *TokenCache) OnPurgeIfEquals(t *oauth2.Token) *TokenCache_PurgeIfEquals { - c_call := _m.On("PurgeIfEquals", t) - return &TokenCache_PurgeIfEquals{Call: c_call} -} - -func (_m *TokenCache) OnPurgeIfEqualsMatch(matchers ...interface{}) *TokenCache_PurgeIfEquals { - c_call := _m.On("PurgeIfEquals", matchers...) - return &TokenCache_PurgeIfEquals{Call: c_call} -} - -// PurgeIfEquals provides a mock function with given fields: t -func (_m *TokenCache) PurgeIfEquals(t *oauth2.Token) (bool, error) { - ret := _m.Called(t) - - var r0 bool - if rf, ok := ret.Get(0).(func(*oauth2.Token) bool); ok { - r0 = rf(t) - } else { - r0 = ret.Get(0).(bool) - } - - var r1 error - if rf, ok := ret.Get(1).(func(*oauth2.Token) error); ok { - r1 = rf(t) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - type TokenCache_SaveToken struct { *mock.Call } @@ -138,40 +84,3 @@ func (_m *TokenCache) SaveToken(token *oauth2.Token) error { return r0 } - -type TokenCache_TryLock struct { - *mock.Call -} - -func (_m TokenCache_TryLock) Return(_a0 bool) *TokenCache_TryLock { - return &TokenCache_TryLock{Call: _m.Call.Return(_a0)} -} - -func (_m *TokenCache) OnTryLock() *TokenCache_TryLock { - c_call := _m.On("TryLock") - return &TokenCache_TryLock{Call: c_call} -} - -func (_m *TokenCache) OnTryLockMatch(matchers ...interface{}) *TokenCache_TryLock { - c_call := _m.On("TryLock", matchers...) - return &TokenCache_TryLock{Call: c_call} -} - -// TryLock provides a mock function with given fields: -func (_m *TokenCache) TryLock() bool { - ret := _m.Called() - - var r0 bool - if rf, ok := ret.Get(0).(func() bool); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(bool) - } - - return r0 -} - -// Unlock provides a mock function with given fields: -func (_m *TokenCache) Unlock() { - _m.Called() -} diff --git a/flyteidl/clients/go/admin/cache/token_cache.go b/flyteidl/clients/go/admin/cache/token_cache.go index f946c8bdea6..e4e2b7e17f2 100644 --- a/flyteidl/clients/go/admin/cache/token_cache.go +++ b/flyteidl/clients/go/admin/cache/token_cache.go @@ -1,39 +1,14 @@ package cache -import ( - "fmt" - "golang.org/x/oauth2" -) +import "golang.org/x/oauth2" //go:generate mockery -all -case=underscore -var ( - ErrNotFound = fmt.Errorf("secret not found in keyring") -) - // TokenCache defines the interface needed to cache and retrieve oauth tokens. type TokenCache interface { // SaveToken saves the token securely to cache. SaveToken(token *oauth2.Token) error - // GetToken retrieves the token from the cache. + // Retrieves the token from the cache. GetToken() (*oauth2.Token, error) - - // PurgeIfEquals purges the token from the cache. - PurgeIfEquals(t *oauth2.Token) (bool, error) - - // Lock the cache. - Lock() - - // TryLock tries to lock the cache. - TryLock() bool - - // Unlock the cache. - Unlock() - - // CondWait waits for the condition to be true. - CondWait() - - // CondSignalCondBroadcast signals the condition. - CondBroadcast() } diff --git a/flyteidl/clients/go/admin/cache/token_cache_inmemory.go b/flyteidl/clients/go/admin/cache/token_cache_inmemory.go index 46e134ec555..9c6223fc06b 100644 --- a/flyteidl/clients/go/admin/cache/token_cache_inmemory.go +++ b/flyteidl/clients/go/admin/cache/token_cache_inmemory.go @@ -2,62 +2,23 @@ package cache import ( "fmt" - "sync" - "sync/atomic" "golang.org/x/oauth2" ) type TokenCacheInMemoryProvider struct { - token atomic.Value - mu *sync.Mutex - cond *sync.Cond + token *oauth2.Token } func (t *TokenCacheInMemoryProvider) SaveToken(token *oauth2.Token) error { - t.token.Store(token) + t.token = token return nil } -func (t *TokenCacheInMemoryProvider) GetToken() (*oauth2.Token, error) { - tkn := t.token.Load() - if tkn == nil { +func (t TokenCacheInMemoryProvider) GetToken() (*oauth2.Token, error) { + if t.token == nil { return nil, fmt.Errorf("cannot find token in cache") } - return tkn.(*oauth2.Token), nil -} - -func (t *TokenCacheInMemoryProvider) PurgeIfEquals(existing *oauth2.Token) (bool, error) { - return t.token.CompareAndSwap(existing, nil), nil -} - -func (t *TokenCacheInMemoryProvider) Lock() { - t.mu.Lock() -} - -func (t *TokenCacheInMemoryProvider) TryLock() bool { - return t.mu.TryLock() -} - -func (t *TokenCacheInMemoryProvider) Unlock() { - t.mu.Unlock() -} - -// CondWait waits for the condition to be true. -func (t *TokenCacheInMemoryProvider) CondWait() { - t.cond.Wait() -} - -// CondBroadcast signals the condition. -func (t *TokenCacheInMemoryProvider) CondBroadcast() { - t.cond.Broadcast() -} - -func NewTokenCacheInMemoryProvider() *TokenCacheInMemoryProvider { - return &TokenCacheInMemoryProvider{ - mu: &sync.Mutex{}, - token: atomic.Value{}, - cond: sync.NewCond(&sync.Mutex{}), - } + return t.token, nil } diff --git a/flyteidl/clients/go/admin/client_builder.go b/flyteidl/clients/go/admin/client_builder.go index 0d1341bf7b8..25b263ecf1c 100644 --- a/flyteidl/clients/go/admin/client_builder.go +++ b/flyteidl/clients/go/admin/client_builder.go @@ -40,7 +40,7 @@ func (cb *ClientsetBuilder) WithDialOptions(opts ...grpc.DialOption) *ClientsetB // Build the clientset using the current state of the ClientsetBuilder func (cb *ClientsetBuilder) Build(ctx context.Context) (*Clientset, error) { if cb.tokenCache == nil { - cb.tokenCache = cache.NewTokenCacheInMemoryProvider() + cb.tokenCache = &cache.TokenCacheInMemoryProvider{} } if cb.config == nil { diff --git a/flyteidl/clients/go/admin/client_builder_test.go b/flyteidl/clients/go/admin/client_builder_test.go index 89bcc385505..c871bcb326d 100644 --- a/flyteidl/clients/go/admin/client_builder_test.go +++ b/flyteidl/clients/go/admin/client_builder_test.go @@ -17,9 +17,9 @@ func TestClientsetBuilder_Build(t *testing.T) { cb := NewClientsetBuilder().WithConfig(&Config{ UseInsecureConnection: true, Endpoint: config.URL{URL: *u}, - }).WithTokenCache(cache.NewTokenCacheInMemoryProvider()) + }).WithTokenCache(&cache.TokenCacheInMemoryProvider{}) ctx := context.Background() _, err := cb.Build(ctx) assert.NoError(t, err) - assert.True(t, reflect.TypeOf(cb.tokenCache) == reflect.TypeOf(cache.NewTokenCacheInMemoryProvider())) + assert.True(t, reflect.TypeOf(cb.tokenCache) == reflect.TypeOf(&cache.TokenCacheInMemoryProvider{})) } diff --git a/flyteidl/clients/go/admin/client_test.go b/flyteidl/clients/go/admin/client_test.go index 042a8266921..eb19b76f471 100644 --- a/flyteidl/clients/go/admin/client_test.go +++ b/flyteidl/clients/go/admin/client_test.go @@ -255,8 +255,6 @@ func TestGetAuthenticationDialOptionPkce(t *testing.T) { mockAuthClient := new(mocks.AuthMetadataServiceClient) mockTokenCache.OnGetTokenMatch().Return(&tokenData, nil) mockTokenCache.OnSaveTokenMatch(mock.Anything).Return(nil) - mockTokenCache.On("Lock").Return() - mockTokenCache.On("Unlock").Return() mockAuthClient.OnGetOAuth2MetadataMatch(mock.Anything, mock.Anything).Return(metadata, nil) mockAuthClient.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(clientMetatadata, nil) tokenSourceProvider, err := NewTokenSourceProvider(ctx, adminServiceConfig, mockTokenCache, mockAuthClient) @@ -290,7 +288,7 @@ func Test_getPkceAuthTokenSource(t *testing.T) { assert.NoError(t, err) // populate the cache - tokenCache := cache.NewTokenCacheInMemoryProvider() + tokenCache := &cache.TokenCacheInMemoryProvider{} assert.NoError(t, tokenCache.SaveToken(&tokenData)) baseOrchestrator := tokenorchestrator.BaseTokenOrchestrator{ diff --git a/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go b/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go index 4edf8c83f6f..3ecbeb746e3 100644 --- a/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go +++ b/flyteidl/clients/go/admin/deviceflow/token_orchestrator_test.go @@ -25,7 +25,7 @@ import ( func TestFetchFromAuthFlow(t *testing.T) { ctx := context.Background() t.Run("fetch from auth flow", func(t *testing.T) { - tokenCache := cache.NewTokenCacheInMemoryProvider() + tokenCache := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewDeviceFlowTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ @@ -98,7 +98,7 @@ func TestFetchFromAuthFlow(t *testing.T) { })) defer fakeServer.Close() - tokenCache := cache.NewTokenCacheInMemoryProvider() + tokenCache := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewDeviceFlowTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ diff --git a/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go b/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go index ca1973ea669..dc1c80f63ac 100644 --- a/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go +++ b/flyteidl/clients/go/admin/pkce/auth_flow_orchestrator_test.go @@ -16,7 +16,7 @@ import ( func TestFetchFromAuthFlow(t *testing.T) { ctx := context.Background() t.Run("fetch from auth flow", func(t *testing.T) { - tokenCache := cache.NewTokenCacheInMemoryProvider() + tokenCache := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewTokenOrchestrator(tokenorchestrator.BaseTokenOrchestrator{ ClientConfig: &oauth.Config{ Config: &oauth2.Config{ diff --git a/flyteidl/clients/go/admin/token_source_provider.go b/flyteidl/clients/go/admin/token_source_provider.go index eb6ecac1e53..44ed5048e24 100644 --- a/flyteidl/clients/go/admin/token_source_provider.go +++ b/flyteidl/clients/go/admin/token_source_provider.go @@ -191,7 +191,7 @@ func NewClientCredentialsTokenSourceProvider(ctx context.Context, cfg *Config, s } secret = strings.TrimSpace(secret) if tokenCache == nil { - tokenCache = cache.NewTokenCacheInMemoryProvider() + tokenCache = &cache.TokenCacheInMemoryProvider{} } return ClientCredentialsTokenSourceProvider{ ccConfig: clientcredentials.Config{ @@ -249,15 +249,13 @@ func (s *customTokenSource) Token() (*oauth2.Token, error) { return nil }) if err != nil { - logger.Warnf(s.ctx, "failed to get token: %v", err) - return nil, fmt.Errorf("failed to get token: %w", err) + return nil, err } - logger.Infof(s.ctx, "retrieved token with expiry %v", token.Expiry) err = s.tokenCache.SaveToken(token) if err != nil { - logger.Warnf(s.ctx, "failed to cache token: %v", err) + logger.Warnf(s.ctx, "failed to cache token: %w", err) } return token, nil diff --git a/flyteidl/clients/go/admin/token_source_provider_test.go b/flyteidl/clients/go/admin/token_source_provider_test.go index 43d0fdd9280..63fc1aa56ec 100644 --- a/flyteidl/clients/go/admin/token_source_provider_test.go +++ b/flyteidl/clients/go/admin/token_source_provider_test.go @@ -127,9 +127,7 @@ func TestCustomTokenSource_Token(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { tokenCache := &tokenCacheMocks.TokenCache{} - tokenCache.OnGetToken().Return(test.token, nil).Maybe() - tokenCache.On("Lock").Return().Maybe() - tokenCache.On("Unlock").Return().Maybe() + tokenCache.OnGetToken().Return(test.token, nil).Once() provider, err := NewClientCredentialsTokenSourceProvider(ctx, cfg, []string{}, "", tokenCache, "") assert.NoError(t, err) source, err := provider.GetTokenSource(ctx) diff --git a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go index 9285a82ac94..6fddeee67e4 100644 --- a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go +++ b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator.go @@ -3,6 +3,8 @@ package tokenorchestrator import ( "context" "fmt" + "time" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/service" "golang.org/x/oauth2" @@ -52,29 +54,22 @@ func (t BaseTokenOrchestrator) FetchTokenFromCacheOrRefreshIt(ctx context.Contex return nil, err } - if token.Valid() { - return token, nil - } - - t.TokenCache.Lock() - defer t.TokenCache.Unlock() - - token, err = t.TokenCache.GetToken() - if err != nil { - return nil, err + if !token.Valid() { + return nil, fmt.Errorf("token from cache is invalid") } - if token.Valid() { + // If token doesn't need to be refreshed, return it. + if time.Now().Before(token.Expiry.Add(-tokenRefreshGracePeriod.Duration)) { + logger.Infof(ctx, "found the token in the cache") return token, nil } + token.Expiry = token.Expiry.Add(-tokenRefreshGracePeriod.Duration) token, err = t.RefreshToken(ctx, token) if err != nil { return nil, fmt.Errorf("failed to refresh token using cached token. Error: %w", err) } - token.Expiry = token.Expiry.Add(-tokenRefreshGracePeriod.Duration) - if !token.Valid() { return nil, fmt.Errorf("refreshed token is invalid") } diff --git a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go index 0a1a9f4985a..ed4afa0ff05 100644 --- a/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go +++ b/flyteidl/clients/go/admin/tokenorchestrator/base_token_orchestrator_test.go @@ -26,7 +26,7 @@ func TestRefreshTheToken(t *testing.T) { ClientID: "dummyClient", }, } - tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() + tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} orchestrator := BaseTokenOrchestrator{ ClientConfig: clientConf, TokenCache: tokenCacheProvider, @@ -58,7 +58,7 @@ func TestFetchFromCache(t *testing.T) { mockAuthClient.OnGetPublicClientConfigMatch(mock.Anything, mock.Anything).Return(clientMetatadata, nil) t.Run("no token in cache", func(t *testing.T) { - tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() + tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) @@ -69,7 +69,7 @@ func TestFetchFromCache(t *testing.T) { }) t.Run("token in cache", func(t *testing.T) { - tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() + tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) assert.NoError(t, err) fileData, _ := os.ReadFile("testdata/token.json") @@ -86,7 +86,7 @@ func TestFetchFromCache(t *testing.T) { }) t.Run("expired token in cache", func(t *testing.T) { - tokenCacheProvider := cache.NewTokenCacheInMemoryProvider() + tokenCacheProvider := &cache.TokenCacheInMemoryProvider{} orchestrator, err := NewBaseTokenOrchestrator(ctx, tokenCacheProvider, mockAuthClient) assert.NoError(t, err) fileData, _ := os.ReadFile("testdata/token.json") diff --git a/flyteidl/clients/go/assets/admin.swagger.json b/flyteidl/clients/go/assets/admin.swagger.json index 7114cbcaf1e..e60790b729d 100644 --- a/flyteidl/clients/go/assets/admin.swagger.json +++ b/flyteidl/clients/go/assets/admin.swagger.json @@ -12389,19 +12389,6 @@ }, "description": "This configuration allows executing raw containers in Flyte using the Flyte CoPilot system.\nFlyte CoPilot, eliminates the needs of flytekit or sdk inside the container. Any inputs required by the users container are side-loaded in the input_path\nAny outputs generated by the user container - within output_path are automatically uploaded." }, - "coreEnumType": { - "type": "object", - "properties": { - "values": { - "type": "array", - "items": { - "type": "string" - }, - "description": "Predefined set of enum values." - } - }, - "description": "Enables declaring enum types, with predefined string values\nFor len(values) \u003e 0, the first value in the ordered list is regarded as the default value. If you wish\nTo provide no defaults, make the first value as undefined." - }, "coreError": { "type": "object", "properties": { @@ -12806,7 +12793,7 @@ "description": "A blob might have specialized implementation details depending on associated metadata." }, "enum_type": { - "$ref": "#/definitions/coreEnumType", + "$ref": "#/definitions/flyteidlcoreEnumType", "description": "Defines an enum with pre-defined string values." }, "structured_dataset_type": { @@ -14326,6 +14313,19 @@ }, "title": "Metadata for a WorkflowNode" }, + "flyteidlcoreEnumType": { + "type": "object", + "properties": { + "values": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Predefined set of enum values." + } + }, + "description": "Enables declaring enum types, with predefined string values\nFor len(values) \u003e 0, the first value in the ordered list is regarded as the default value. If you wish\nTo provide no defaults, make the first value as undefined." + }, "flyteidlcoreKeyValuePair": { "type": "object", "properties": { @@ -14469,11 +14469,11 @@ "properties": { "@type": { "type": "string", - "description": "A URL/resource name that uniquely identifies the type of the serialized\nprotocol buffer message. This string must contain at least\none \"/\" character. The last segment of the URL's path must represent\nthe fully qualified name of the type (as in\n`path/google.protobuf.Duration`). The name should be in a canonical form\n(e.g., leading \".\" is not accepted).\n\nIn practice, teams usually precompile into the binary all types that they\nexpect it to use in the context of Any. However, for URLs which use the\nscheme `http`, `https`, or no scheme, one can optionally set up a type\nserver that maps type URLs to message definitions as follows:\n\n* If no scheme is provided, `https` is assumed.\n* An HTTP GET on the URL must yield a [google.protobuf.Type][]\n value in binary format, or produce an error.\n* Applications are allowed to cache lookup results based on the\n URL, or have them precompiled into a binary to avoid any\n lookup. Therefore, binary compatibility needs to be preserved\n on changes to types. (Use versioned type names to manage\n breaking changes.)\n\nNote: this functionality is not currently available in the official\nprotobuf release, and it is not used for type URLs beginning with\ntype.googleapis.com.\n\nSchemes other than `http`, `https` (or the empty scheme) might be\nused with implementation specific semantics." + "description": "A URL/resource name that uniquely identifies the type of the serialized\nprotocol buffer message. This string must contain at least\none \"/\" character. The last segment of the URL's path must represent\nthe fully qualified name of the type (as in\n`path/google.protobuf.Duration`). The name should be in a canonical form\n(e.g., leading \".\" is not accepted).\n\nIn practice, teams usually precompile into the binary all types that they\nexpect it to use in the context of Any. However, for URLs which use the\nscheme `http`, `https`, or no scheme, one can optionally set up a type\nserver that maps type URLs to message definitions as follows:\n\n* If no scheme is provided, `https` is assumed.\n* An HTTP GET on the URL must yield a [google.protobuf.Type][]\n value in binary format, or produce an error.\n* Applications are allowed to cache lookup results based on the\n URL, or have them precompiled into a binary to avoid any\n lookup. Therefore, binary compatibility needs to be preserved\n on changes to types. (Use versioned type names to manage\n breaking changes.)\n\nNote: this functionality is not currently available in the official\nprotobuf release, and it is not used for type URLs beginning with\ntype.googleapis.com. As of May 2023, there are no widely used type server\nimplementations and no plans to implement one.\n\nSchemes other than `http`, `https` (or the empty scheme) might be\nused with implementation specific semantics." } }, "additionalProperties": {}, - "description": "`Any` contains an arbitrary serialized protocol buffer message along with a\nURL that describes the type of the serialized message.\n\nProtobuf library provides support to pack/unpack Any values in the form\nof utility functions or additional generated methods of the Any type.\n\nExample 1: Pack and unpack a message in C++.\n\n Foo foo = ...;\n Any any;\n any.PackFrom(foo);\n ...\n if (any.UnpackTo(\u0026foo)) {\n ...\n }\n\nExample 2: Pack and unpack a message in Java.\n\n Foo foo = ...;\n Any any = Any.pack(foo);\n ...\n if (any.is(Foo.class)) {\n foo = any.unpack(Foo.class);\n }\n // or ...\n if (any.isSameTypeAs(Foo.getDefaultInstance())) {\n foo = any.unpack(Foo.getDefaultInstance());\n }\n\nExample 3: Pack and unpack a message in Python.\n\n foo = Foo(...)\n any = Any()\n any.Pack(foo)\n ...\n if any.Is(Foo.DESCRIPTOR):\n any.Unpack(foo)\n ...\n\nExample 4: Pack and unpack a message in Go\n\n foo := \u0026pb.Foo{...}\n any, err := anypb.New(foo)\n if err != nil {\n ...\n }\n ...\n foo := \u0026pb.Foo{}\n if err := any.UnmarshalTo(foo); err != nil {\n ...\n }\n\nThe pack methods provided by protobuf library will by default use\n'type.googleapis.com/full.type.name' as the type URL and the unpack\nmethods only use the fully qualified type name after the last '/'\nin the type URL, for example \"foo.bar.com/x/y.z\" will yield type\nname \"y.z\".\n\nJSON\n\nThe JSON representation of an `Any` value uses the regular\nrepresentation of the deserialized, embedded message, with an\nadditional field `@type` which contains the type URL. Example:\n\n package google.profile;\n message Person {\n string first_name = 1;\n string last_name = 2;\n }\n\n {\n \"@type\": \"type.googleapis.com/google.profile.Person\",\n \"firstName\": \u003cstring\u003e,\n \"lastName\": \u003cstring\u003e\n }\n\nIf the embedded message type is well-known and has a custom JSON\nrepresentation, that representation will be embedded adding a field\n`value` which holds the custom JSON in addition to the `@type`\nfield. Example (for message [google.protobuf.Duration][]):\n\n {\n \"@type\": \"type.googleapis.com/google.protobuf.Duration\",\n \"value\": \"1.212s\"\n }" + "description": "`Any` contains an arbitrary serialized protocol buffer message along with a\nURL that describes the type of the serialized message.\n\nProtobuf library provides support to pack/unpack Any values in the form\nof utility functions or additional generated methods of the Any type.\n\nExample 1: Pack and unpack a message in C++.\n\n Foo foo = ...;\n Any any;\n any.PackFrom(foo);\n ...\n if (any.UnpackTo(\u0026foo)) {\n ...\n }\n\nExample 2: Pack and unpack a message in Java.\n\n Foo foo = ...;\n Any any = Any.pack(foo);\n ...\n if (any.is(Foo.class)) {\n foo = any.unpack(Foo.class);\n }\n // or ...\n if (any.isSameTypeAs(Foo.getDefaultInstance())) {\n foo = any.unpack(Foo.getDefaultInstance());\n }\n\n Example 3: Pack and unpack a message in Python.\n\n foo = Foo(...)\n any = Any()\n any.Pack(foo)\n ...\n if any.Is(Foo.DESCRIPTOR):\n any.Unpack(foo)\n ...\n\n Example 4: Pack and unpack a message in Go\n\n foo := \u0026pb.Foo{...}\n any, err := anypb.New(foo)\n if err != nil {\n ...\n }\n ...\n foo := \u0026pb.Foo{}\n if err := any.UnmarshalTo(foo); err != nil {\n ...\n }\n\nThe pack methods provided by protobuf library will by default use\n'type.googleapis.com/full.type.name' as the type URL and the unpack\nmethods only use the fully qualified type name after the last '/'\nin the type URL, for example \"foo.bar.com/x/y.z\" will yield type\nname \"y.z\".\n\nJSON\n====\nThe JSON representation of an `Any` value uses the regular\nrepresentation of the deserialized, embedded message, with an\nadditional field `@type` which contains the type URL. Example:\n\n package google.profile;\n message Person {\n string first_name = 1;\n string last_name = 2;\n }\n\n {\n \"@type\": \"type.googleapis.com/google.profile.Person\",\n \"firstName\": \u003cstring\u003e,\n \"lastName\": \u003cstring\u003e\n }\n\nIf the embedded message type is well-known and has a custom JSON\nrepresentation, that representation will be embedded adding a field\n`value` which holds the custom JSON in addition to the `@type`\nfield. Example (for message [google.protobuf.Duration][]):\n\n {\n \"@type\": \"type.googleapis.com/google.protobuf.Duration\",\n \"value\": \"1.212s\"\n }" }, "protobufNullValue": { "type": "string", @@ -14481,7 +14481,7 @@ "NULL_VALUE" ], "default": "NULL_VALUE", - "description": "`NullValue` is a singleton enumeration to represent the null value for the\n`Value` type union.\n\n The JSON representation for `NullValue` is JSON `null`.\n\n - NULL_VALUE: Null value." + "description": "`NullValue` is a singleton enumeration to represent the null value for the\n`Value` type union.\n\nThe JSON representation for `NullValue` is JSON `null`.\n\n - NULL_VALUE: Null value." }, "serviceAdminServiceCreateTaskBody": { "type": "object", diff --git a/flyteidl/go.mod b/flyteidl/go.mod index c6c39b7ecae..1389920eb3f 100644 --- a/flyteidl/go.mod +++ b/flyteidl/go.mod @@ -47,7 +47,7 @@ require ( github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/flyteidl/go.sum b/flyteidl/go.sum index 856319bed6e..29545665484 100644 --- a/flyteidl/go.sum +++ b/flyteidl/go.sum @@ -58,8 +58,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flyteplugins/go.mod b/flyteplugins/go.mod index 9e110c583dd..35c768fbd8d 100644 --- a/flyteplugins/go.mod +++ b/flyteplugins/go.mod @@ -72,7 +72,7 @@ require ( github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.1 // indirect diff --git a/flyteplugins/go.sum b/flyteplugins/go.sum index f6bbc892d22..a4218686bdf 100644 --- a/flyteplugins/go.sum +++ b/flyteplugins/go.sum @@ -141,8 +141,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytepropeller/go.mod b/flytepropeller/go.mod index 9d9eb637729..04e5455c074 100644 --- a/flytepropeller/go.mod +++ b/flytepropeller/go.mod @@ -84,7 +84,7 @@ require ( github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/flytepropeller/go.sum b/flytepropeller/go.sum index 2525211e6bc..c39cf98d503 100644 --- a/flytepropeller/go.sum +++ b/flytepropeller/go.sum @@ -151,8 +151,8 @@ github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJ github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytestdlib/go.mod b/flytestdlib/go.mod index 067fd8ec2e5..9d1d7c491f3 100644 --- a/flytestdlib/go.mod +++ b/flytestdlib/go.mod @@ -9,7 +9,7 @@ require ( github.com/ernesto-jimenez/gogen v0.0.0-20180125220232-d7d4131e6607 github.com/fatih/color v1.13.0 github.com/fatih/structtag v1.2.0 - github.com/flyteorg/stow v0.3.10 + github.com/flyteorg/stow v0.3.8 github.com/fsnotify/fsnotify v1.6.0 github.com/ghodss/yaml v1.0.0 github.com/go-gormigrate/gormigrate/v2 v2.1.1 diff --git a/flytestdlib/go.sum b/flytestdlib/go.sum index 3a0c13475b7..4ea3652f9d6 100644 --- a/flytestdlib/go.sum +++ b/flytestdlib/go.sum @@ -115,8 +115,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4= github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= diff --git a/flytestdlib/storage/stow_store.go b/flytestdlib/storage/stow_store.go index 2d23059972b..d10b706ac7b 100644 --- a/flytestdlib/storage/stow_store.go +++ b/flytestdlib/storage/stow_store.go @@ -357,7 +357,7 @@ func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference return SignedURLResponse{}, err } - resp, err := c.PreSignRequest(ctx, properties.Scope, key, stow.PresignRequestParams{ + urlStr, err := c.PreSignRequest(ctx, properties.Scope, key, stow.PresignRequestParams{ ExpiresIn: properties.ExpiresIn, ContentMD5: properties.ContentMD5, }) @@ -366,7 +366,7 @@ func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference return SignedURLResponse{}, err } - urlVal, err := url.Parse(resp.Url) + urlVal, err := url.Parse(urlStr) if err != nil { return SignedURLResponse{}, err } diff --git a/flytestdlib/storage/stow_store_test.go b/flytestdlib/storage/stow_store_test.go index c66ecfad4eb..d888977c577 100644 --- a/flytestdlib/storage/stow_store_test.go +++ b/flytestdlib/storage/stow_store_test.go @@ -53,8 +53,8 @@ type mockStowContainer struct { // CreateSignedURL creates a signed url with the provided properties. func (m mockStowContainer) PreSignRequest(_ context.Context, _ stow.ClientMethod, s string, - _ stow.PresignRequestParams) (urlResp stow.PresignResponse, err error) { - return stow.PresignResponse{Url: s}, nil + _ stow.PresignRequestParams) (url string, err error) { + return s, nil } func (m mockStowContainer) ID() string { diff --git a/go.mod b/go.mod index 40daf6cfeaa..ed927351362 100644 --- a/go.mod +++ b/go.mod @@ -77,7 +77,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/flyteorg/flyte/flyteidl v1.10.7-b0.0.20240109185447-d86f91995c44 // indirect github.com/flyteorg/flyte/flyteplugins v1.10.7-0.20240104005944-64548962d375 // indirect - github.com/flyteorg/stow v0.3.10 // indirect + github.com/flyteorg/stow v0.3.8 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/ghodss/yaml v1.0.1-0.20220118164431-d8423dcdf344 // indirect github.com/go-gormigrate/gormigrate/v2 v2.1.1 // indirect diff --git a/go.sum b/go.sum index bb236557ea6..f122afa2a44 100644 --- a/go.sum +++ b/go.sum @@ -284,8 +284,8 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/stow v0.3.10 h1:uEe+tI+CGKn21H93uXp9z05hqynEki2BO9KkW/GweY8= -github.com/flyteorg/stow v0.3.10/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= +github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4= +github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=