diff --git a/fasttask/README.md b/fasttask/README.md index f5628e19056..5f61af01e1a 100644 --- a/fasttask/README.md +++ b/fasttask/README.md @@ -13,5 +13,7 @@ FastTask is a Flyte plugin to execute tasks quickly using a persistent, external export FLYTE_AWS_ACCESS_KEY_ID=minio export FLYTE_AWS_SECRET_ACCESS_KEY=miniostorage export FLYTE_AWS_ENDPOINT=http://localhost:30084 + + ./target/debug/worker bridge --queue-id=bar --fast-register-dir-override /tmp/fasttask-test ## build fast task worker image docker build -t hamersaw/fasttask: -f Dockerfile . diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 261e5fca9be..656850bbad3 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -275,14 +275,15 @@ func (f *FastTaskService) Cleanup(ctx context.Context, taskID, queueID, workerID f.queuesLock.RLock() defer f.queuesLock.RUnlock() - queue := f.queues[queueID] - queue.lock.RLock() - defer queue.lock.RUnlock() - - if worker, exists := queue.workers[workerID]; exists { - worker.responseChan <- &pb.HeartbeatResponse{ - TaskId: taskID, - Operation: pb.HeartbeatResponse_DELETE, + if queue, exists := f.queues[queueID]; exists { + queue.lock.RLock() + defer queue.lock.RUnlock() + + if worker, exists := queue.workers[workerID]; exists { + worker.responseChan <- &pb.HeartbeatResponse{ + TaskId: taskID, + Operation: pb.HeartbeatResponse_DELETE, + } } }