Skip to content

Commit

Permalink
Add pending owners cache (#267)
Browse files Browse the repository at this point in the history
## Overview
When kicking off fast tasks, we typically have to do a second round of task evaluation before a worker is available, which adds latency to the initial task runs while the worker(s) come up. This change keeps an in-memory cache of tasks waiting on a worker so that when the first one comes up, we can opportunistically enqueue the owning workflow for evaluation and avoid a ~10s delay.

I chose to use a service-wide lock, which trades off some lock contention for reduced complexity. This is acceptable since we already grab the service-wide `queuesLock` when discovering a new worker (call to `Heartbeat`).

## Test Plan
~- [ ] Haven't added any unittests yet. Wanted to get feedback on the approach~
Going to defer unittests to the broad pass @hamersaw is doing

- [x] Ran locally and verified that with the change tasks do not require a second round  

Without the enqueue call (2s delay from worker registered -> send task)
```
"2024-05-13T16:42:22-07:00"
"adding pending owner flytesnacks-development/feb7da731f60c482db2d for task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:42:22-07:00"
"offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:42:22-07:00"
"offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:42:22-07:00"
"offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:42:23-07:00"
"worker 6b772a8b-7748-4819-99ea-140086ca27af registered with queue 4fc648840f89c02"
"2024-05-13T16:42:25-07:00"
"offering task feb7da731f60c482db2d-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:42:25-07:00"
"sending task feb7da731f60c482db2d-n0-0 to worker 6b772a8b-7748-4819-99ea-140086ca27af on queue 4fc648840f89c02"
```

With enqueue call (same second)
```
"2024-05-13T16:48:25-07:00"
"adding pending owner flytesnacks-development/f96f3fe69ae744129ab3 for task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:48:25-07:00"
"offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:48:25-07:00"
"offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:48:25-07:00"
"offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:48:26-07:00"
"worker abb8da76-b4f2-44a2-8fe9-c577f682c914 registered with queue 4fc648840f89c02"
"2024-05-13T16:48:26-07:00"
"offering task f96f3fe69ae744129ab3-n0-0 on queue 4fc648840f89c02"
"2024-05-13T16:48:26-07:00"
"sending task f96f3fe69ae744129ab3-n0-0 to worker abb8da76-b4f2-44a2-8fe9-c577f682c914 on queue 4fc648840f89c02"
```

## Rollout Plan (if applicable)
No planning to put this behind a config (although code potentially move `maxPendingOwnersPerQueue` to a config and treat 0 as disabled). Will bring to cloud and deploy in the coming days

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed
  • Loading branch information
andrewwdye authored May 16, 2024
1 parent cadb26b commit 30c3790
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 31 deletions.
1 change: 0 additions & 1 deletion fasttask/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ func (p *Plugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) erro
// active executions. this is performed in the `Finalize` function which is _always_ called
// during any abort. if this logic changes, we will need to add a call to
// `fastTaskService.Cleanup` to ensure proper abort here.
// TODO: add this since the service may now hold pending tasks without a worker up
return nil
}

Expand Down
153 changes: 123 additions & 30 deletions fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ import (
"github.com/unionai/flyte/fasttask/plugin/pb"
)

var maxPendingOwnersPerQueue = 100

// FastTaskService is a gRPC service that manages assignment and management of task executions with
// respect to fasttask workers.
type FastTaskService struct {
pb.UnimplementedFastTaskServer
enqueueOwner core.EnqueueOwner
queues map[string]*Queue
queuesLock sync.RWMutex
enqueueOwner core.EnqueueOwner

queues map[string]*Queue
queuesLock sync.RWMutex

// A map of pending owners by queue. When a new worker becomes available, use this to enqueue owners for reevaluation.
// Note, this is an optimistic approach and may not include all pending owners.
pendingTaskOwners map[string]map[string]types.NamespacedName // map[queueID]map[taskID]ownerID
pendingTaskOwnersLock sync.RWMutex

taskStatusChannels sync.Map // map[string]chan *WorkerTaskStatus
metrics metrics
}
Expand Down Expand Up @@ -107,33 +116,11 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error {
}

// register worker with queue
f.queuesLock.Lock()
queue, exists := f.queues[heartbeatRequest.GetQueueId()]
if !exists {
queue = &Queue{
workers: make(map[string]*Worker),
}
f.queues[heartbeatRequest.GetQueueId()] = queue
}
f.queuesLock.Unlock()

queue.lock.Lock()
queue.workers[workerID] = worker
queue.lock.Unlock()
queue := f.addWorkerToQueue(heartbeatRequest.GetQueueId(), worker)

// cleanup worker on exit
defer func() {
f.queuesLock.Lock()
queue, exists := f.queues[heartbeatRequest.GetQueueId()]
if exists {
queue.lock.Lock()
delete(queue.workers, workerID)
if len(queue.workers) == 0 {
delete(f.queues, heartbeatRequest.GetQueueId())
}
queue.lock.Unlock()
}
f.queuesLock.Unlock()
f.removeWorkerFromQueue(heartbeatRequest.GetQueueId(), workerID)
}()

// start go routine to handle heartbeat responses
Expand All @@ -150,6 +137,9 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error {
}
}()

// new worker available, enqueue owners
f.enqueuePendingOwners(heartbeatRequest.GetQueueId())

// handle heartbeat requests
for {
heartbeatRequest, err := stream.Recv()
Expand Down Expand Up @@ -198,6 +188,100 @@ func (f *FastTaskService) Heartbeat(stream pb.FastTask_HeartbeatServer) error {
return nil
}

func (f *FastTaskService) addWorkerToQueue(queueID string, worker *Worker) *Queue {
f.queuesLock.Lock()
defer f.queuesLock.Unlock()

queue, exists := f.queues[queueID]
if !exists {
queue = &Queue{
workers: make(map[string]*Worker),
}
f.queues[queueID] = queue
}

queue.lock.Lock()
defer queue.lock.Unlock()

queue.workers[worker.workerID] = worker
return queue
}

func (f *FastTaskService) removeWorkerFromQueue(queueID, workerID string) {
f.queuesLock.Lock()
defer f.queuesLock.Unlock()

queue, exists := f.queues[queueID]
if !exists {
return
}

queue.lock.Lock()
defer queue.lock.Unlock()

delete(queue.workers, workerID)
if len(queue.workers) == 0 {
delete(f.queues, queueID)
}
}

// addPendingOwner adds to the pending owners list for the queue, if not already full
func (f *FastTaskService) addPendingOwner(queueID, taskID string, ownerID types.NamespacedName) {
f.pendingTaskOwnersLock.Lock()
defer f.pendingTaskOwnersLock.Unlock()

owners, exists := f.pendingTaskOwners[queueID]
if !exists {
owners = make(map[string]types.NamespacedName)
f.pendingTaskOwners[queueID] = owners
}

if len(owners) >= maxPendingOwnersPerQueue {
return
}
owners[taskID] = ownerID
}

// removePendingOwner removes the pending owner from the list if still there
func (f *FastTaskService) removePendingOwner(queueID, taskID string) {
f.pendingTaskOwnersLock.Lock()
defer f.pendingTaskOwnersLock.Unlock()

owners, exists := f.pendingTaskOwners[queueID]
if !exists {
return
}

delete(owners, taskID)
if len(owners) == 0 {
delete(f.pendingTaskOwners, queueID)
}
}

// enqueuePendingOwners drains the pending owners list for the queue and enqueues them for reevaluation
func (f *FastTaskService) enqueuePendingOwners(queueID string) {
f.pendingTaskOwnersLock.Lock()
defer f.pendingTaskOwnersLock.Unlock()

owners, exists := f.pendingTaskOwners[queueID]
if !exists {
return
}

enqueued := make(map[types.NamespacedName]bool)
for _, ownerID := range owners {
if _, ok := enqueued[ownerID]; ok {
continue
}
if err := f.enqueueOwner(ownerID); err != nil {
logger.Warnf(context.Background(), "failed to enqueue owner %s: %+v", ownerID, err)
}
enqueued[ownerID] = true
}

delete(f.pendingTaskOwners, queueID)
}

// OfferOnQueue offers a task to a worker on a specific queue. If no workers are available, an
// empty string is returned.
func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, namespace, workflowID string, cmd []string) (string, error) {
Expand All @@ -206,6 +290,7 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam

queue, exists := f.queues[queueID]
if !exists {
f.addPendingOwner(queueID, taskID, types.NamespacedName{Namespace: namespace, Name: workflowID})
f.metrics.taskNoWorkersAvailable.Inc()
return "", nil // no workers available
}
Expand All @@ -232,8 +317,11 @@ func (f *FastTaskService) OfferOnQueue(ctx context.Context, queueID, taskID, nam
worker = acceptedWorkers[rand.Intn(len(acceptedWorkers))]
worker.capacity.BacklogCount++
} else {
// No workers available. Note, we do not add to pending owners at this time as we are optimizing for the worker
// startup case. Theworker backlog should be sufficient to keep the worker busy without needing to proactively
// enqueue owners when capacity becomes available.
f.metrics.taskNoCapacityAvailable.Inc()
return "", nil // no workers available
return "", nil
}

// send assign message to worker
Expand Down Expand Up @@ -325,15 +413,20 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID

// delete task context
f.taskStatusChannels.Delete(taskID)

// remove pending owner
f.removePendingOwner(queueID, taskID)

return nil
}

// NewFastTaskService creates a new FastTaskService.
func NewFastTaskService(enqueueOwner core.EnqueueOwner) *FastTaskService {
scope := promutils.NewScope("fasttask")
svc := &FastTaskService{
enqueueOwner: enqueueOwner,
queues: make(map[string]*Queue),
enqueueOwner: enqueueOwner,
queues: make(map[string]*Queue),
pendingTaskOwners: make(map[string]map[string]types.NamespacedName),
metrics: metrics{
taskNoWorkersAvailable: scope.MustNewCounter("task_no_workers_available", "Count of task assignment attempts with no workers available"),
taskNoCapacityAvailable: scope.MustNewCounter("task_no_capacity_available", "Count of task assignment attempts with no capacity available"),
Expand Down

0 comments on commit 30c3790

Please sign in to comment.