Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: curio: refactor curio graceful shutdown #11794

Merged
merged 2 commits into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/curio/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/pkg/errors"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -128,7 +127,7 @@ var runCmd = &cli.Command{
if err != nil {
return nil
}
defer taskEngine.GracefullyTerminate(time.Hour)
defer taskEngine.GracefullyTerminate()

err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown.
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions itests/harmonytask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestHarmonyTasks(t *testing.T) {
e, err := harmonytask.New(cdb, []harmonytask.TaskInterface{t1}, "test:1")
require.NoError(t, err)
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
e.GracefullyTerminate(time.Minute)
e.GracefullyTerminate()
expected := []string{"taskResult56", "taskResult73"}
sort.Strings(t1.WorkCompleted)
require.Equal(t, expected, t1.WorkCompleted, "unexpected results")
Expand Down Expand Up @@ -173,8 +173,8 @@ func TestHarmonyTasksWith2PartiesPolling(t *testing.T) {
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{workerParty}, "test:2")
require.NoError(t, err)
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
sender.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate(time.Second * 5)
sender.GracefullyTerminate()
worker.GracefullyTerminate()
sort.Strings(dest)
require.Equal(t, []string{"A", "B"}, dest)
})
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestWorkStealing(t *testing.T) {
worker, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fooLetterSaver(t, cdb, &dest)}, "test:2")
require.ErrorIs(t, err, nil)
time.Sleep(time.Second) // do the work. FLAKYNESS RISK HERE.
worker.GracefullyTerminate(time.Second * 5)
worker.GracefullyTerminate()
require.Equal(t, []string{"M"}, dest)
})
}
Expand Down Expand Up @@ -243,8 +243,8 @@ func TestTaskRetry(t *testing.T) {
rcv, err := harmonytask.New(cdb, []harmonytask.TaskInterface{fails2xPerMsg}, "test:2")
require.NoError(t, err)
time.Sleep(time.Second)
sender.GracefullyTerminate(time.Hour)
rcv.GracefullyTerminate(time.Hour)
sender.GracefullyTerminate()
rcv.GracefullyTerminate()
sort.Strings(dest)
require.Equal(t, []string{"A", "B"}, dest)
type hist struct {
Expand Down
2 changes: 1 addition & 1 deletion itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (n *Ensemble) Start() *Ensemble {
if err != nil {
return nil
}
defer taskEngine.GracefullyTerminate(time.Hour)
defer taskEngine.GracefullyTerminate()

err = rpc.ListenAndServe(ctx, p.Deps, shutdownChan) // Monitor for shutdown.
require.NoError(n.t, err)
Expand Down
57 changes: 44 additions & 13 deletions lib/harmony/harmonytask/harmonytask.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,53 @@ func New(
// GracefullyTerminate hangs until all present tasks have completed.
// Call this to cleanly exit the process. As some processes are long-running,
// passing a deadline will ignore those still running (to be picked-up later).
func (e *TaskEngine) GracefullyTerminate(deadline time.Duration) {
e.grace()
e.reg.Shutdown()
deadlineChan := time.NewTimer(deadline).C
top:
for _, h := range e.handlers {
if h.Count.Load() > 0 {
select {
case <-deadlineChan:
return
default:
time.Sleep(time.Millisecond)
goto top
func (e *TaskEngine) GracefullyTerminate() {

// If there are any Post tasks then wait till Timeout and check again
// When no Post tasks are active, break out of loop and call the shutdown function
for {
timeout := time.Second
for _, h := range e.handlers {
if h.TaskTypeDetails.Name == "WinPost" && h.Count.Load() > 0 {
timeout = time.Second * 30
log.Infof("node shutdown deferred for %f seconds", timeout.Seconds())
continue
}
if h.TaskTypeDetails.Name == "WdPost" && h.Count.Load() > 0 {
timeout = time.Minute
log.Infof("node shutdown deferred for %f seconds due to running WdPost task", timeout.Seconds())
continue
}

if h.TaskTypeDetails.Name == "WdPostSubmit" && h.Count.Load() > 0 {
timeout = time.Minute
log.Infof("node shutdown deferred for %f seconds due to running WdPostSubmit task", timeout.Seconds())
continue
}

if h.TaskTypeDetails.Name == "WdPostRecover" && h.Count.Load() > 0 {
timeout = time.Minute
log.Infof("node shutdown deferred for %f seconds due to running WdPostRecover task", timeout.Seconds())
continue
}

// Test tasks fir itest
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
if h.TaskTypeDetails.Name == "ThingOne" && h.Count.Load() > 0 {
timeout = time.Second * 2
log.Infof("node shutdown deferred for %f seconds due to running itest task", timeout.Seconds())
continue
}
}
if timeout > time.Second {
time.Sleep(timeout)
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
continue
}
break
}

e.grace()
e.reg.Shutdown()
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
return
}

func (e *TaskEngine) poller() {
Expand Down