Skip to content

Commit

Permalink
worker: Commands to pause/resume task processing
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Oct 27, 2020
1 parent 413643a commit 033e1b0
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 1 deletion.
14 changes: 14 additions & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,19 @@ type WorkerAPI interface {

StorageAddLocal(ctx context.Context, path string) error

// SetEnabled marks the worker as enabled/disabled. Not that this setting
// may take a few seconds to propagate to task scheduler
SetEnabled(ctx context.Context, enabled bool) error

Enabled(ctx context.Context) (bool, error)

// WaitQuiet blocks until there are no tasks running
WaitQuiet(ctx context.Context) error

// returns a random UUID of worker session, generated randomly when worker
// process starts
ProcessSession(context.Context) (uuid.UUID, error)

// Like ProcessSession, but returns an error when worker is disabled
Session(context.Context) (uuid.UUID, error)
}
24 changes: 23 additions & 1 deletion api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,13 @@ type WorkerStruct struct {
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`

Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
SetEnabled func(ctx context.Context, enabled bool) error `perm:"admin"`
Enabled func(ctx context.Context) (bool, error) `perm:"admin"`

WaitQuiet func(ctx context.Context) error `perm:"admin"`

ProcessSession func(context.Context) (uuid.UUID, error) `perm:"admin"`
Session func(context.Context) (uuid.UUID, error) `perm:"admin"`
}
}

Expand Down Expand Up @@ -1544,6 +1550,22 @@ func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}

func (w *WorkerStruct) SetEnabled(ctx context.Context, enabled bool) error {
return w.Internal.SetEnabled(ctx, enabled)
}

func (w *WorkerStruct) Enabled(ctx context.Context) (bool, error) {
return w.Internal.Enabled(ctx)
}

func (w *WorkerStruct) WaitQuiet(ctx context.Context) error {
return w.Internal.WaitQuiet(ctx)
}

func (w *WorkerStruct) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.Internal.ProcessSession(ctx)
}

func (w *WorkerStruct) Session(ctx context.Context) (uuid.UUID, error) {
return w.Internal.Session(ctx)
}
Expand Down
51 changes: 51 additions & 0 deletions cmd/lotus-seal-worker/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

lcli "github.com/filecoin-project/lotus/cli"
)

var setCmd = &cli.Command{
Name: "set",
Usage: "Manage worker settings",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "enabled",
Usage: "enable/disable new task processing",
Value: true,
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

if err := api.SetEnabled(ctx, cctx.Bool("enabled")); err != nil {
return xerrors.Errorf("SetEnabled: %w", err)
}

return nil
},
}

var waitQuietCmd = &cli.Command{
Name: "wait-quiet",
Usage: "Block until all running tasks exit",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

return api.WaitQuiet(ctx)
},
}
12 changes: 12 additions & 0 deletions cmd/lotus-seal-worker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ var infoCmd = &cli.Command{
cli.VersionPrinter(cctx)
fmt.Println()

sess, err := api.ProcessSession(ctx)
if err != nil {
return xerrors.Errorf("getting session: %w", err)
}
fmt.Printf("Session: %s\n", sess)

enabled, err := api.Enabled(ctx)
if err != nil {
return xerrors.Errorf("checking worker status: %w", err)
}
fmt.Printf("Enabled: %t", enabled)

info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func main() {
runCmd,
infoCmd,
storageCmd,
setCmd,
waitQuietCmd,
}

app := &cli.App{
Expand Down
34 changes: 34 additions & 0 deletions cmd/lotus-seal-worker/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package main

import (
"context"
"github.com/google/uuid"
"sync/atomic"

"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
Expand All @@ -17,6 +19,8 @@ type worker struct {

localStore *stores.Local
ls stores.LocalStorage

disabled int64
}

func (w *worker) Version(context.Context) (build.Version, error) {
Expand All @@ -42,4 +46,34 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
return nil
}

func (w *worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1)
if enabled {
disabled = 0
}
atomic.StoreInt64(&w.disabled, disabled)
return nil
}

func (w *worker) Enabled(ctx context.Context) (bool, error) {
return atomic.LoadInt64(&w.disabled) == 0, nil
}

func (w *worker) WaitQuiet(ctx context.Context) error {
w.LocalWorker.WaitQuiet() // uses WaitGroup under the hood so no ctx :/
return nil
}

func (w *worker) ProcessSession(ctx context.Context) (uuid.UUID, error) {
return w.LocalWorker.Session(ctx)
}

func (w *worker) Session(ctx context.Context) (uuid.UUID, error) {
if atomic.LoadInt64(&w.disabled) == 1 {
return uuid.UUID{}, xerrors.Errorf("worker disabled")
}

return w.LocalWorker.Session(ctx)
}

var _ storiface.WorkerCalls = &worker{}

0 comments on commit 033e1b0

Please sign in to comment.