Skip to content

Commit

Permalink
Merge branch 'master' into feature/arraynode-parallelism-config
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Apr 23, 2024
2 parents f6021e0 + 8c5aac6 commit 6e90230
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 13 deletions.
7 changes: 7 additions & 0 deletions docs/_templates/base.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,15 @@ type ResourceCache struct {
// A wrapper for each item in the cache.
type CacheItem struct {
State

Resource webapi.Resource
}

func (c CacheItem) IsTerminal() bool {
if c.Resource != nil {
if resource, ok := c.Resource.(interface{ IsTerminal() bool }); ok {
return resource.IsTerminal()
}
}
return c.State.Phase.IsTerminal()
}

Expand Down Expand Up @@ -80,7 +84,7 @@ func (q *ResourceCache) SyncResource(ctx context.Context, batch cache.Batch) (
logger.Debugf(ctx, "Sync loop - processing resource with cache key [%s]",
resource.GetID())

if cacheItem.State.Phase.IsTerminal() {
if cacheItem.IsTerminal() {
logger.Debugf(ctx, "Sync loop - resource cache key [%v] in terminal state [%s]",
resource.GetID())
resp = append(resp, cache.ItemSyncResponse{
Expand Down
10 changes: 8 additions & 2 deletions flyteplugins/go/tasks/pluginmachinery/internal/webapi/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,19 @@ func (c CorePlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) e
}

func (c CorePlugin) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error {
cacheItemID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()
err := c.cache.DeleteDelayed(cacheItemID)
if err != nil {
logger.Errorf(ctx, "Failed to delete resource [%v] from cache. Error: %v", cacheItemID, err)
return fmt.Errorf("failed to delete resource [%v] from cache. Error: %v", cacheItemID, err)
}

if len(c.p.GetConfig().ResourceQuotas) == 0 {
// If there are no defined quotas, there is nothing to cleanup.
return nil
}

logger.Infof(ctx, "Attempting to finalize resource [%v].",
tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName())
logger.Infof(ctx, "Attempting to finalize resource [%v].", cacheItemID)

return c.tokenAllocator.releaseToken(ctx, c.p, tCtx, c.metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
// Store the created resource name, and update our state.
state.ResourceMeta = rMeta
state.Phase = PhaseResourcesCreated
state.PhaseVersion = 2

cacheItem := CacheItem{
State: *state,
}
Expand All @@ -49,5 +51,5 @@ func launch(ctx context.Context, p webapi.AsyncPlugin, tCtx core.TaskExecutionCo
return nil, core.PhaseInfo{}, err
}

return state, core.PhaseInfoQueued(time.Now(), 2, "launched"), nil
return state, core.PhaseInfoQueued(time.Now(), state.PhaseVersion, "launched"), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func Test_launch(t *testing.T) {
s := State{
ResourceMeta: "abc",
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
}
c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, nil)

Expand All @@ -51,6 +52,7 @@ func Test_launch(t *testing.T) {
c := &mocks2.AutoRefresh{}
s := State{
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
ResourceMeta: "abc",
}

Expand Down Expand Up @@ -96,6 +98,7 @@ func Test_launch(t *testing.T) {
c := &mocks2.AutoRefresh{}
s := State{
Phase: PhaseResourcesCreated,
PhaseVersion: 2,
ResourceMeta: "my-id",
}
c.OnGetOrCreate("my-id", CacheItem{State: s}).Return(CacheItem{State: s}, fmt.Errorf("failed to cache"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
}
return state, core.PhaseInfoFailure(errors.CacheFailed, cacheItem.ErrorMessage, nil), nil
}
return state, core.PhaseInfoQueued(time.Now(), core.DefaultPhaseVersion, "job submitted"), nil
return state, core.PhaseInfoQueued(time.Now(), cacheItem.PhaseVersion, "job submitted"), nil
}

newPhase, err := p.Status(ctx, newPluginContext(cacheItem.ResourceMeta, cacheItem.Resource, "", tCtx))
Expand All @@ -57,6 +57,7 @@ func monitor(ctx context.Context, tCtx core.TaskExecutionContext, p Client, cach
}

cacheItem.Phase = newPluginPhase
cacheItem.PhaseVersion = newPhase.Version()

if newPluginPhase.IsTerminal() {
// Queue item for deletion in the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type State struct {
// Phase current phase of the resource.
Phase Phase `json:"phase,omitempty"`

// PhaseVersion is the version of the phase. This is used to detect if the phase has changed since the last time
PhaseVersion uint32

// ResourceMeta contain metadata about resource this task created. This can be a complex structure or a simple type
// (e.g. a string). It should contain enough information for the plugin to interact (retrieve, check status, delete)
// with the resource through the remote service.
Expand Down
5 changes: 5 additions & 0 deletions flyteplugins/go/tasks/plugins/webapi/agent/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type ResourceWrapper struct {
LogLinks []*flyteIdl.TaskLog
}

// IsTerminal is used to avoid making network calls to the agent service if the resource is already in a terminal state.
func (r ResourceWrapper) IsTerminal() bool {
return r.Phase == flyteIdl.TaskExecution_SUCCEEDED || r.Phase == flyteIdl.TaskExecution_FAILED || r.Phase == flyteIdl.TaskExecution_ABORTED
}

type ResourceMetaWrapper struct {
OutputPrefix string
AgentResourceMeta []byte
Expand Down
28 changes: 21 additions & 7 deletions flytestdlib/cache/auto_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ type autoRefresh struct {
syncCb SyncFunc
createBatchesCb CreateBatchesFunc
lruMap *lru.Cache
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
// Items that are currently being processed are in the processing set.
// It will prevent the same item from being processed multiple times by different workers.
processing *syncSet
toDelete *syncSet
syncPeriod time.Duration
workqueue workqueue.RateLimitingInterface
parallelizm int
lock sync.RWMutex
}

func getEvictionFunction(counter prometheus.Counter) func(key interface{}, value interface{}) {
Expand Down Expand Up @@ -211,6 +214,13 @@ func (w *autoRefresh) GetOrCreate(id ItemID, item Item) (Item, error) {

w.lruMap.Add(id, item)
w.metrics.CacheMiss.Inc()

// It fixes cold start issue in the AutoRefreshCache by adding the item to the workqueue when it is created.
// This way, the item will be processed without waiting for the next sync cycle (30s by default).
batch := make([]ItemWrapper, 0, 1)
batch = append(batch, itemWrapper{id: id, item: item})
w.workqueue.AddRateLimited(&batch)
w.processing.Insert(id)
return item, nil
}

Expand All @@ -236,7 +246,7 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
// If not ok, it means evicted between the item was evicted between getting the keys and this update loop
// which is fine, we can just ignore.
if value, ok := w.lruMap.Peek(k); ok {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal()) {
if item, ok := value.(Item); !ok || (ok && !item.IsTerminal() && !w.processing.Contains(k)) {
snapshot = append(snapshot, itemWrapper{
id: k.(ItemID),
item: value.(Item),
Expand All @@ -253,6 +263,9 @@ func (w *autoRefresh) enqueueBatches(ctx context.Context) error {
for _, batch := range batches {
b := batch
w.workqueue.AddRateLimited(&b)
for i := 1; i < len(b); i++ {
w.processing.Insert(b[i].GetID())
}
}

return nil
Expand Down Expand Up @@ -295,7 +308,6 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
logger.Debugf(ctx, "Shutting down worker")
return nil
}

// Since we create batches every time we sync, we will just remove the item from the queue here
// regardless of whether it succeeded the sync or not.
w.workqueue.Forget(batch)
Expand All @@ -304,6 +316,7 @@ func (w *autoRefresh) sync(ctx context.Context) (err error) {
newBatch := make(Batch, 0, len(*batch.(*Batch)))
for _, b := range *batch.(*Batch) {
itemID := b.GetID()
w.processing.Remove(itemID)
item, ok := w.lruMap.Get(itemID)
if !ok {
logger.Debugf(ctx, "item with id [%v] not found in cache", itemID)
Expand Down Expand Up @@ -363,6 +376,7 @@ func NewAutoRefreshBatchedCache(name string, createBatches CreateBatchesFunc, sy
createBatchesCb: createBatches,
syncCb: syncCb,
lruMap: lruCache,
processing: newSyncSet(),
toDelete: newSyncSet(),
syncPeriod: resyncPeriod,
workqueue: workqueue.NewNamedRateLimitingQueue(syncRateLimiter, scope.CurrentScope()),
Expand Down

0 comments on commit 6e90230

Please sign in to comment.