diff --git a/docs/_templates/base.html b/docs/_templates/base.html index f3c5ef1a49..7e0e81480a 100644 --- a/docs/_templates/base.html +++ b/docs/_templates/base.html @@ -98,6 +98,13 @@ } }); + +
{% block body %} diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go index b13070534e..7569abd90e 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/cache.go @@ -44,11 +44,15 @@ type ResourceCache struct { // A wrapper for each item in the cache. type CacheItem struct { State - Resource webapi.Resource } func (c CacheItem) IsTerminal() bool { + if c.Resource != nil { + if resource, ok := c.Resource.(interface{ IsTerminal() bool }); ok { + return resource.IsTerminal() + } + } return c.State.Phase.IsTerminal() } @@ -80,7 +84,7 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) ( logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]", resource.GetID()) - if cacheItem.State.Phase.IsTerminal() { + if cacheItem.IsTerminal() { logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]", resource.GetID()) resp = append(resp, cache.ItemSyncResponse{ diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go index 9c98521897..1d0e31f71c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go @@ -117,13 +117,19 @@ func (c CorePlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) e } func (c CorePlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error { + cacheItemID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() + err := c.cache.DeleteDelayed(cacheItemID) + if err != nil { + logger.Errorf(ctx, "Failed to delete resource [%v] from cache. Error: %v", cacheItemID, err) + return fmt.Errorf("failed to delete resource [%v] from cache. Error: %v", cacheItemID, err) + } + if len(c.p.GetConfig().ResourceQuotas) == 0 { // If there are no defined quotas, there is nothing to cleanup. return nil } - logger.Infof(ctx, "Attempting to finalize resource [%v].", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) + logger.Infof(ctx, "Attempting to finalize resource [%v].", cacheItemID) return c.tokenAllocator.releaseToken(ctx, c.p, tCtx, c.metrics) } diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher.go index 9482b3df95..a8c2dc108b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher.go @@ -38,6 +38,8 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo // Store the created resource name, and update our state. state.ResourceMeta = rMeta state.Phase = PhaseResourcesCreated + state.PhaseVersion = 2 + cacheItem := CacheItem{ State: *state, } @@ -49,5 +51,5 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo return nil, core.PhaseInfo{}, err } - return state, core.PhaseInfoQueued(time.Now(), 2, "launched"), nil + return state, core.PhaseInfoQueued(time.Now(), state.PhaseVersion, "launched"), nil } diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher_test.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher_test.go index 7836cc591d..a200de9b4f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/launcher_test.go @@ -27,6 +27,7 @@ func Test_launch(t *testing.T) { s := State{ ResourceMeta: "abc", Phase: PhaseResourcesCreated, + PhaseVersion: 2, } c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, nil) @@ -51,6 +52,7 @@ func Test_launch(t *testing.T) { c := &mocks2.AutoRefresh{} s := State{ Phase: PhaseResourcesCreated, + PhaseVersion: 2, ResourceMeta: "abc", } @@ -96,6 +98,7 @@ func Test_launch(t *testing.T) { c := &mocks2.AutoRefresh{} s := State{ Phase: PhaseResourcesCreated, + PhaseVersion: 2, ResourceMeta: "my-id", } c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, fmt.Errorf("failed to cache")) diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go index 4edbadcb07..6244be3f26 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/monitor.go @@ -39,7 +39,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach } return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil } - return state, core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "job submitted"), nil + return state, core.PhaseInfoQueued(time.Now(), cacheItem.PhaseVersion, "job submitted"), nil } newPhase, err := p.Status(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", tCtx)) @@ -57,6 +57,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach } cacheItem.Phase = newPluginPhase + cacheItem.PhaseVersion = newPhase.Version() if newPluginPhase.IsTerminal() { // Queue item for deletion in the cache. diff --git a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/state.go b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/state.go index 164e8b2ef7..176b627b91 100644 --- a/flyteplugins/go/tasks/pluginmachinery/internal/webapi/state.go +++ b/flyteplugins/go/tasks/pluginmachinery/internal/webapi/state.go @@ -41,6 +41,9 @@ type State struct { // Phase current phase of the resource. Phase Phase `json:"phase,omitempty"` + // PhaseVersion is the version of the phase. This is used to detect if the phase has changed since the last time + PhaseVersion uint32 + // ResourceMeta contain metadata about resource this task created. This can be a complex structure or a simple type // (e.g. a string). It should contain enough information for the plugin to interact (retrieve, check status, delete) // with the resource through the remote service. diff --git a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go index d1b18e76dd..e41c4ccaa0 100644 --- a/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/agent/plugin.go @@ -40,6 +40,11 @@ type ResourceWrapper struct { LogLinks []*flyteIdl.TaskLog } +// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state. +func (r ResourceWrapper) IsTerminal() bool { + return r.Phase == flyteIdl.TaskExecution_SUCCEEDED || r.Phase == flyteIdl.TaskExecution_FAILED || r.Phase == flyteIdl.TaskExecution_ABORTED +} + type ResourceMetaWrapper struct { OutputPrefix string AgentResourceMeta []byte diff --git a/flytestdlib/cache/auto_refresh.go b/flytestdlib/cache/auto_refresh.go index 58490669f4..252706d27a 100644 --- a/flytestdlib/cache/auto_refresh.go +++ b/flytestdlib/cache/auto_refresh.go @@ -116,11 +116,14 @@ type autoRefresh struct { syncCb SyncFunc createBatchesCb CreateBatchesFunc lruMap *lru.Cache - toDelete *syncSet - syncPeriod time.Duration - workqueue workqueue.RateLimitingInterface - parallelizm int - lock sync.RWMutex + // Items that are currently being processed are in the processing set. + // It will prevent the same item from being processed multiple times by different workers. + processing *syncSet + toDelete *syncSet + syncPeriod time.Duration + workqueue workqueue.RateLimitingInterface + parallelizm int + lock sync.RWMutex } func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) { @@ -211,6 +214,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) { w.lruMap.Add(id, item) w.metrics.CacheMiss.Inc() + + // It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created. + // This way, the item will be processed without waiting for the next sync cycle (30s by default). + batch := make([]ItemWrapper, 0, 1) + batch = append(batch, itemWrapper{id: id, item: item}) + w.workqueue.AddRateLimited(&batch) + w.processing.Insert(id) return item, nil } @@ -236,7 +246,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { // If not ok, it means evicted between the item was evicted between getting the keys and this update loop // which is fine, we can just ignore. if value, ok := w.lruMap.Peek(k); ok { - if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) { + if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.processing.Contains(k)) { snapshot = append(snapshot, itemWrapper{ id: k.(ItemID), item: value.(Item), @@ -253,6 +263,9 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error { for _, batch := range batches { b := batch w.workqueue.AddRateLimited(&b) + for i := 1; i < len(b); i++ { + w.processing.Insert(b[i].GetID()) + } } return nil @@ -295,7 +308,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { logger.Debugf(ctx, "Shutting down worker") return nil } - // Since we create batches every time we sync, we will just remove the item from the queue here // regardless of whether it succeeded the sync or not. w.workqueue.Forget(batch) @@ -304,6 +316,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) { newBatch := make(Batch, 0, len(*batch.(*Batch))) for _, b := range *batch.(*Batch) { itemID := b.GetID() + w.processing.Remove(itemID) item, ok := w.lruMap.Get(itemID) if !ok { logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) @@ -363,6 +376,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy createBatchesCb: createBatches, syncCb: syncCb, lruMap: lruCache, + processing: newSyncSet(), toDelete: newSyncSet(), syncPeriod: resyncPeriod, workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),