diff --git a/storage/sealer/sched_post.go b/storage/sealer/sched_post.go index 0421ee8a187..aa96f8eb37c 100644 --- a/storage/sealer/sched_post.go +++ b/storage/sealer/sched_post.go @@ -2,6 +2,9 @@ package sealer import ( "context" + "errors" + "github.com/filecoin-project/go-jsonrpc" + "github.com/hashicorp/go-multierror" "math/rand" "sync" "time" @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg } }() - selected := candidates[0] - worker := ps.workers[selected.id] + var rpcErrs error - return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { - ps.lk.Unlock() - defer ps.lk.Lock() + for i, selected := range candidates { + worker := ps.workers[selected.id] - return work(ctx, worker.workerRpc) - }) + err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error { + ps.lk.Unlock() + defer ps.lk.Lock() + + return work(ctx, worker.workerRpc) + }) + if err == nil { + return nil + } + + // if the error is RPCConnectionError, try another worker, if not, return the error + if !errors.As(err, new(*jsonrpc.RPCConnectionError)) { + return err + } + + log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates)) + rpcErrs = multierror.Append(rpcErrs, err) + } + + return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs) } type candidateWorker struct {