Skip to content

Commit

Permalink
Merge pull request #10394 from filecoin-project/fix/disabled-post-wor…
Browse files Browse the repository at this point in the history
…ker-handling

fix: wdpost: disabled post worker handling
  • Loading branch information
arajasek authored Mar 6, 2023
2 parents 12496e5 + 9c2f8ee commit fea7dfd
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 20 deletions.
2 changes: 2 additions & 0 deletions itests/kit/ensemble.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ func (n *Ensemble) Worker(minerNode *TestMiner, worker *TestWorker, opts ...Node
MinerNode: minerNode,
RemoteListener: rl,
options: options,

Stop: func(ctx context.Context) error { return nil },
}

n.inactive.workers = append(n.inactive.workers, worker)
Expand Down
6 changes: 6 additions & 0 deletions itests/kit/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func workerRpc(t *testing.T, m *TestWorker) *TestWorker {
require.NoError(t, err)
t.Cleanup(stop)

m.Stop = func(ctx context.Context) error {
srv.Close()
srv.CloseClientConnections()
return nil
}

m.ListenAddr, m.Worker = maddr, cl
return m
}
53 changes: 41 additions & 12 deletions itests/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {

sectors := 2 * 48 * 2

client, miner, _, ens := kit.EnsembleWorker(t,
client, miner, _, _ := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
Expand All @@ -378,17 +378,8 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
require.NoError(t, err)

bm := ens.InterconnectAll().BeginMiningMustPost(2 * time.Millisecond)[0]

di = di.NextNotElapsed()

t.Log("Running one proving period")
waitUntil := di.Open + di.WPoStChallengeWindow*2 - 2
client.WaitTillChain(ctx, kit.HeightAtLeast(waitUntil))

t.Log("Waiting for post message")
bm.Stop()

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
Expand All @@ -398,10 +389,48 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) {
tryDl(0)
tryDl(40)
tryDl(di.Index + 4)
}

lastPending, err := client.MpoolPending(ctx, types.EmptyTSK)
func TestWindowPostWorkerDisconnected(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_ = logging.SetLogLevel("storageminer", "INFO")

sectors := 2 * 48 * 2

_, miner, badWorker, ens := kit.EnsembleWorker(t,
kit.PresealSectors(sectors), // 2 sectors per partition, 2 partitions in all 48 deadlines
kit.LatestActorsAt(-1),
kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}))

var goodWorker kit.TestWorker
ens.Worker(miner, &goodWorker, kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}), kit.ThroughRPC()).Start()

// wait for all workers
require.Eventually(t, func() bool {
w, err := miner.WorkerStats(ctx)
require.NoError(t, err)
return len(w) == 3 // 2 post + 1 miner-builtin
}, 10*time.Second, 100*time.Millisecond)

tryDl := func(dl uint64) {
p, err := miner.ComputeWindowPoSt(ctx, dl, types.EmptyTSK)
require.NoError(t, err)
require.Len(t, p, 1)
require.Equal(t, dl, p[0].Deadline)
}
tryDl(0) // this will run on the not-yet-bad badWorker

err := badWorker.Stop(ctx)
require.NoError(t, err)
require.Len(t, lastPending, 0)

tryDl(10) // will fail on the badWorker, then should retry on the goodWorker

time.Sleep(15 * time.Second)

tryDl(40) // after HeartbeatInterval, the badWorker should be marked as disabled
}

func TestSchedulerRemoveRequest(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion storage/sealer/manager_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (m *Manager) generateWindowPoSt(ctx context.Context, minerID abi.ActorID, s
skipped = append(skipped, sk...)

if err != nil {
retErr = multierr.Append(retErr, xerrors.Errorf("partitionCount:%d err:%+v", partIdx, err))
retErr = multierr.Append(retErr, xerrors.Errorf("partitionIndex:%d err:%+v", partIdx, err))
}
flk.Unlock()
}
Expand Down
38 changes: 31 additions & 7 deletions storage/sealer/sched_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package sealer

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/lotus/storage/paths"
Expand Down Expand Up @@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
}
}()

selected := candidates[0]
worker := ps.workers[selected.id]
var rpcErrs error

return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
for i, selected := range candidates {
worker := ps.workers[selected.id]

return work(ctx, worker.workerRpc)
})
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()

return work(ctx, worker.workerRpc)
})
if err == nil {
return nil
}

// if the error is RPCConnectionError, try another worker, if not, return the error
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
return err
}

log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
rpcErrs = multierror.Append(rpcErrs, err)
}

return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
}

type candidateWorker struct {
Expand All @@ -124,6 +143,11 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
for wid, wr := range ps.workers {
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)

if !wr.Enabled {
log.Debugf("sched: not scheduling on PoSt-worker %s, worker disabled", wid)
continue
}

if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}
Expand Down

0 comments on commit fea7dfd

Please sign in to comment.