From 897f54940ab4ff78848383c113a8f4b6c51d0220 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Tue, 14 May 2024 08:47:35 -0500 Subject: [PATCH] gracefully abort fasttask on worker timeout (#263) ## Overview This PR enables graceful aborts (rather than panics) when a fasttask times out waiting for worker availability. ## Test Plan Tested locally. ## Rollout Plan (if applicable) This can be rolled out along with any other changes. ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed ## Jira Issue https://unionai.atlassian.net/browse/EXO-103 ## Checklist * [ ] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [ ] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- fasttask/README.md | 2 ++ fasttask/plugin/service.go | 17 +++++++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/fasttask/README.md b/fasttask/README.md index f5628e19056..5f61af01e1a 100644 --- a/fasttask/README.md +++ b/fasttask/README.md @@ -13,5 +13,7 @@ FastTask is a Flyte plugin to execute tasks quickly using a persistent, external export FLYTE_AWS_ACCESS_KEY_ID=minio export FLYTE_AWS_SECRET_ACCESS_KEY=miniostorage export FLYTE_AWS_ENDPOINT=http://localhost:30084 + + ./target/debug/worker bridge --queue-id=bar --fast-register-dir-override /tmp/fasttask-test ## build fast task worker image docker build -t hamersaw/fasttask: -f Dockerfile . diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 261e5fca9be..656850bbad3 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -275,14 +275,15 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID f.queuesLock.RLock() defer f.queuesLock.RUnlock() - queue := f.queues[queueID] - queue.lock.RLock() - defer queue.lock.RUnlock() - - if worker, exists := queue.workers[workerID]; exists { - worker.responseChan <- &pb.HeartbeatResponse{ - TaskId: taskID, - Operation: pb.HeartbeatResponse_DELETE, + if queue, exists := f.queues[queueID]; exists { + queue.lock.RLock() + defer queue.lock.RUnlock() + + if worker, exists := queue.workers[workerID]; exists { + worker.responseChan <- &pb.HeartbeatResponse{ + TaskId: taskID, + Operation: pb.HeartbeatResponse_DELETE, + } } }