diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index 25c752e5f42..f06359c8737 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -143,17 +143,33 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, if err != nil { return false, xerrors.Errorf("API error getting head: %w", err) } + + var dealSafeSealEpoch abi.ChainEpoch for _, piece := range sector.Pieces { if piece.DealInfo == nil { continue } - dealSafeSealEpoch := piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer - dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime) - if dealSafeSealTime.Before(sealTime) { - sealTime = dealSafeSealTime + + dealSafeSealEpoch = piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer + + alloc, _ := m.Api.StateGetAllocationForPendingDeal(ctx.Context(), piece.DealInfo.DealID, types.EmptyTSK) + // alloc is nil if this is not a verified deal in nv17 or later + if alloc == nil { + continue + } + + if alloc.Expiration-cfg.StartEpochSealingBuffer < dealSafeSealEpoch { + dealSafeSealEpoch = alloc.Expiration - cfg.StartEpochSealingBuffer + log.Debugw("checking safe seal epoch", "dealSafeSealEpoch", dealSafeSealEpoch) } } + dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime) + if dealSafeSealTime.Before(sealTime) { + log.Debugw("deal safe time is before seal time", "dealSafeSealTime", dealSafeSealTime, "sealTime", sealTime) + sealTime = dealSafeSealTime + } + if now.After(sealTime) { log.Infow("starting to seal deal sector", "trigger", "wait-timeout", "creation", sector.CreationTime) return true, ctx.Send(SectorStartPacking{}) diff --git a/storage/pipeline/mocks/mock_precommit_batcher.go b/storage/pipeline/mocks/mock_precommit_batcher.go index 2f65e3e034e..2d514eb7eb0 100644 --- a/storage/pipeline/mocks/mock_precommit_batcher.go +++ b/storage/pipeline/mocks/mock_precommit_batcher.go @@ -11,7 +11,9 @@ import ( gomock "github.com/golang/mock/gomock" address "github.com/filecoin-project/go-address" + abi "github.com/filecoin-project/go-state-types/abi" big "github.com/filecoin-project/go-state-types/big" + verifreg "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" network "github.com/filecoin-project/go-state-types/network" api "github.com/filecoin-project/lotus/api" @@ -86,6 +88,21 @@ func (mr *MockPreCommitBatcherApiMockRecorder) StateAccountKey(arg0, arg1, arg2 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateAccountKey", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateAccountKey), arg0, arg1, arg2) } +// StateGetAllocationForPendingDeal mocks base method. +func (m *MockPreCommitBatcherApi) StateGetAllocationForPendingDeal(arg0 context.Context, arg1 abi.DealID, arg2 types.TipSetKey) (*verifreg.Allocation, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateGetAllocationForPendingDeal", arg0, arg1, arg2) + ret0, _ := ret[0].(*verifreg.Allocation) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateGetAllocationForPendingDeal indicates an expected call of StateGetAllocationForPendingDeal. +func (mr *MockPreCommitBatcherApiMockRecorder) StateGetAllocationForPendingDeal(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateGetAllocationForPendingDeal", reflect.TypeOf((*MockPreCommitBatcherApi)(nil).StateGetAllocationForPendingDeal), arg0, arg1, arg2) +} + // StateLookupID mocks base method. func (m *MockPreCommitBatcherApi) StateLookupID(arg0 context.Context, arg1 address.Address, arg2 types.TipSetKey) (address.Address, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/precommit_batch.go b/storage/pipeline/precommit_batch.go index 6ee6aed9353..9bc6243291b 100644 --- a/storage/pipeline/precommit_batch.go +++ b/storage/pipeline/precommit_batch.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v9/miner" + verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" @@ -34,6 +35,7 @@ type PreCommitBatcherApi interface { StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) ChainHead(ctx context.Context) (*types.TipSet, error) StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error) + StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error) // Address selector WalletBalance(context.Context, address.Address) (types.BigInt, error) @@ -386,15 +388,25 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos return sealiface.PreCommitBatchRes{}, err } - cutoff, err := getPreCommitCutoff(ts.Height(), s) - if err != nil { - return sealiface.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err) + dealStartCutoff := getDealStartCutoff(s) + if dealStartCutoff <= ts.Height() { + return sealiface.PreCommitBatchRes{}, xerrors.Errorf("cutoff has already passed (cutoff %d <= curEpoch %d)", dealStartCutoff, ts.Height()) + } + + // Allocation cutoff is a soft deadline, so don't fail if we've passed it. + allocationCutoff := b.getAllocationCutoff(s) + + var cutoffEpoch abi.ChainEpoch + if dealStartCutoff < allocationCutoff { + cutoffEpoch = dealStartCutoff + } else { + cutoffEpoch = allocationCutoff } sn := s.SectorNumber b.lk.Lock() - b.cutoffs[sn] = cutoff + b.cutoffs[sn] = time.Now().Add(time.Duration(cutoffEpoch-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) b.todo[sn] = &preCommitEntry{ deposit: deposit, pci: in, @@ -471,8 +483,7 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error { } } -// TODO: If this returned epochs, it would make testing much easier -func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) (time.Time, error) { +func getDealStartCutoff(si SectorInfo) abi.ChainEpoch { cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback for _, p := range si.Pieces { if p.DealInfo == nil { @@ -485,9 +496,24 @@ func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) (time.Time, erro } } - if cutoffEpoch <= curEpoch { - return time.Now(), xerrors.Errorf("cutoff has already passed (cutoff %d <= curEpoch %d)", cutoffEpoch, curEpoch) - } + return cutoffEpoch +} + +func (b *PreCommitBatcher) getAllocationCutoff(si SectorInfo) abi.ChainEpoch { + cutoff := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback + for _, p := range si.Pieces { + if p.DealInfo == nil { + continue + } - return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil + alloc, _ := b.api.StateGetAllocationForPendingDeal(b.mctx, p.DealInfo.DealID, types.EmptyTSK) + // alloc is nil if this is not a verified deal in nv17 or later + if alloc == nil { + continue + } + if alloc.Expiration < cutoff { + cutoff = alloc.Expiration + } + } + return cutoff }