Skip to content

Commit

Permalink
post worker sched: Retry on alternative worker on RPC errors
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Mar 6, 2023
1 parent 4b99472 commit 9a295e5
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions storage/sealer/sched_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package sealer

import (
"context"
"errors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/hashicorp/go-multierror"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
}
}()

selected := candidates[0]
worker := ps.workers[selected.id]
var rpcErrs error

return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
for i, selected := range candidates {
worker := ps.workers[selected.id]

return work(ctx, worker.workerRpc)
})
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()

return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}

// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}

log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}

return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
}

type candidateWorker struct {
Expand Down

0 comments on commit 9a295e5

Please sign in to comment.