Skip to content

Commit

Permalink
perf(runner): close fs service client when task finished
Browse files Browse the repository at this point in the history
  • Loading branch information
ma-pony committed Apr 5, 2023
1 parent 3fd8f8a commit c6356d3
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
1 change: 1 addition & 0 deletions interfaces/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type TaskRunner interface {
Dispose() (err error)
SetSubscribeTimeout(timeout time.Duration)
GetTaskId() (id primitive.ObjectID)
CleanUp() (err error)
}
13 changes: 13 additions & 0 deletions task/handler/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ func (r *Runner) Dispose() (err error) {
}, backoff.NewExponentialBackOff())
}

// CleanUp clean up task runner
func (r *Runner) CleanUp() (err error) {
// close fs service
fsSvc := r.fsSvc.GetFsService().GetFs()
if fsSvc == nil {
return
}
if err = fsSvc.Close(); err != nil {
return
}
return
}

func (r *Runner) SetSubscribeTimeout(timeout time.Duration) {
r.subscribeTimeout = timeout
}
Expand Down
7 changes: 6 additions & 1 deletion task/handler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,12 @@ func (svc *Service) run(taskId primitive.ObjectID) (err error) {
go func() {
// delete runner from pool
defer svc.deleteRunner(r.GetTaskId())

defer func(r interfaces.TaskRunner) {
err := r.CleanUp()
if err != nil {
log.Errorf("task[%s] clean up error: %v", r.GetTaskId().Hex(), err)
}
}(r)
// run task process (blocking)
// error or finish after task runner ends
if err := r.Run(); err != nil {
Expand Down

0 comments on commit c6356d3

Please sign in to comment.