Skip to content

Commit

Permalink
gracefully abort fasttask on worker timeout (#263)
Browse files Browse the repository at this point in the history
## Overview
This PR enables graceful aborts (rather than panics) when a fasttask times out waiting for worker availability.

## Test Plan
Tested locally.

## Rollout Plan (if applicable)
This can be rolled out along with any other changes.

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/EXO-103

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored May 14, 2024
1 parent c0a15cc commit 897f549
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
2 changes: 2 additions & 0 deletions fasttask/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ FastTask is a Flyte plugin to execute tasks quickly using a persistent, external
export FLYTE_AWS_ACCESS_KEY_ID=minio
export FLYTE_AWS_SECRET_ACCESS_KEY=miniostorage
export FLYTE_AWS_ENDPOINT=http://localhost:30084

./target/debug/worker bridge --queue-id=bar --fast-register-dir-override /tmp/fasttask-test
## build fast task worker image
docker build -t hamersaw/fasttask:<tag> -f Dockerfile .
17 changes: 9 additions & 8 deletions fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,15 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID
f.queuesLock.RLock()
defer f.queuesLock.RUnlock()

queue := f.queues[queueID]
queue.lock.RLock()
defer queue.lock.RUnlock()

if worker, exists := queue.workers[workerID]; exists {
worker.responseChan <- &pb.HeartbeatResponse{
TaskId: taskID,
Operation: pb.HeartbeatResponse_DELETE,
if queue, exists := f.queues[queueID]; exists {
queue.lock.RLock()
defer queue.lock.RUnlock()

if worker, exists := queue.workers[workerID]; exists {
worker.responseChan <- &pb.HeartbeatResponse{
TaskId: taskID,
Operation: pb.HeartbeatResponse_DELETE,
}
}
}

Expand Down

0 comments on commit 897f549

Please sign in to comment.