diff --git a/flytectl/pkg/pkce/token_cache_keyring.go b/flytectl/pkg/pkce/token_cache_keyring.go index ff547827bd..afcfa74db5 100644 --- a/flytectl/pkg/pkce/token_cache_keyring.go +++ b/flytectl/pkg/pkce/token_cache_keyring.go @@ -1,11 +1,13 @@ package pkce import ( + "context" "encoding/json" "fmt" "sync" "github.com/flyteorg/flyte/flyteidl/clients/go/admin/cache" + "github.com/flyteorg/flyte/flytestdlib/logger" "github.com/zalando/go-keyring" "golang.org/x/oauth2" @@ -21,6 +23,7 @@ type TokenCacheKeyringProvider struct { ServiceName string ServiceUser string mu *sync.Mutex + condLocker *cache.NoopLocker cond *sync.Cond } @@ -28,11 +31,8 @@ func (t *TokenCacheKeyringProvider) PurgeIfEquals(existing *oauth2.Token) (bool, if existingBytes, err := json.Marshal(existing); err != nil { return false, fmt.Errorf("unable to marshal token to save in cache due to %w", err) } else if tokenJSON, err := keyring.Get(t.ServiceName, t.ServiceUser); err != nil { - if err.Error() == "secret not found in keyring" { - return false, fmt.Errorf("unable to read token from cache. Error: %w", cache.ErrNotFound) - } - - return false, fmt.Errorf("unable to read token from cache. Error: %w", err) + logger.Warnf(context.Background(), "unable to read token from cache but not failing the purge as the token might not have been saved at all. Error: %v", err) + return true, nil } else if tokenJSON != string(existingBytes) { return false, nil } @@ -54,12 +54,30 @@ func (t *TokenCacheKeyringProvider) TryLock() bool { return t.mu.TryLock() } -// CondWait waits for the condition to be true. +// CondWait adds the current go routine to the condition waitlist and waits for another go routine to notify using CondBroadcast +// The current usage is that one who was able to acquire the lock using TryLock is the one who gets a valid token and notifies all the waitlist requesters so that they can use the new valid token. +// It also locks the Locker in the condition variable as the semantics of Wait is that it unlocks the Locker after adding +// the consumer to the waitlist and before blocking on notification. +// We use the condLocker which is noOp locker to get added to waitlist for notifications. +// The underlying notifcationList doesn't need to be guarded as it implementation is atomic and is thread safe +// Refer https://go.dev/src/runtime/sema.go +// Following is the function and its comments +// notifyListAdd adds the caller to a notify list such that it can receive +// notifications. The caller must eventually call notifyListWait to wait for +// such a notification, passing the returned ticket number. +// +// func notifyListAdd(l *notifyList) uint32 { +// // This may be called concurrently, for example, when called from +// // sync.Cond.Wait while holding a RWMutex in read mode. +// return l.wait.Add(1) - 1 +// } func (t *TokenCacheKeyringProvider) CondWait() { + t.condLocker.Lock() t.cond.Wait() + t.condLocker.Unlock() } -// CondBroadcast signals the condition. +// CondBroadcast broadcasts the condition. func (t *TokenCacheKeyringProvider) CondBroadcast() { t.cond.Broadcast() } @@ -103,9 +121,11 @@ func (t *TokenCacheKeyringProvider) GetToken() (*oauth2.Token, error) { } func NewTokenCacheKeyringProvider(serviceName, serviceUser string) *TokenCacheKeyringProvider { + condLocker := &cache.NoopLocker{} return &TokenCacheKeyringProvider{ mu: &sync.Mutex{}, - cond: sync.NewCond(&sync.Mutex{}), + condLocker: condLocker, + cond: sync.NewCond(condLocker), ServiceName: serviceName, ServiceUser: serviceUser, }