Skip to content

Commit

Permalink
Merge pull request #9878 from filecoin-project/gstuart/check-allocati…
Browse files Browse the repository at this point in the history
…on-expiry

feat: Check for allocation expiry when waiting to seal sectors
  • Loading branch information
magik6k authored Jan 27, 2023
2 parents db9c516 + 12a06de commit a0771b1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 14 deletions.
24 changes: 20 additions & 4 deletions storage/pipeline/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,17 +143,33 @@ func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo,
if err != nil {
return false, xerrors.Errorf("API error getting head: %w", err)
}

var dealSafeSealEpoch abi.ChainEpoch
for _, piece := range sector.Pieces {
if piece.DealInfo == nil {
continue
}
dealSafeSealEpoch := piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer
dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime)
if dealSafeSealTime.Before(sealTime) {
sealTime = dealSafeSealTime

dealSafeSealEpoch = piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer

alloc, _ := m.Api.StateGetAllocationForPendingDeal(ctx.Context(), piece.DealInfo.DealID, types.EmptyTSK)
// alloc is nil if this is not a verified deal in nv17 or later
if alloc == nil {
continue
}

if alloc.Expiration-cfg.StartEpochSealingBuffer < dealSafeSealEpoch {
dealSafeSealEpoch = alloc.Expiration - cfg.StartEpochSealingBuffer
log.Debugw("checking safe seal epoch", "dealSafeSealEpoch", dealSafeSealEpoch)
}
}

dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime)
if dealSafeSealTime.Before(sealTime) {
log.Debugw("deal safe time is before seal time", "dealSafeSealTime", dealSafeSealTime, "sealTime", sealTime)
sealTime = dealSafeSealTime
}

if now.After(sealTime) {
log.Infow("starting to seal deal sector", "trigger", "wait-timeout", "creation", sector.CreationTime)
return true, ctx.Send(SectorStartPacking{})
Expand Down
17 changes: 17 additions & 0 deletions storage/pipeline/mocks/mock_precommit_batcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 36 additions & 10 deletions storage/pipeline/precommit_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
"github.com/filecoin-project/go-state-types/builtin/v9/miner"
verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
"github.com/filecoin-project/go-state-types/network"

"github.com/filecoin-project/lotus/api"
Expand All @@ -34,6 +35,7 @@ type PreCommitBatcherApi interface {
StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error)
ChainHead(ctx context.Context) (*types.TipSet, error)
StateNetworkVersion(ctx context.Context, tsk types.TipSetKey) (network.Version, error)
StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error)

// Address selector
WalletBalance(context.Context, address.Address) (types.BigInt, error)
Expand Down Expand Up @@ -386,15 +388,25 @@ func (b *PreCommitBatcher) AddPreCommit(ctx context.Context, s SectorInfo, depos
return sealiface.PreCommitBatchRes{}, err
}

cutoff, err := getPreCommitCutoff(ts.Height(), s)
if err != nil {
return sealiface.PreCommitBatchRes{}, xerrors.Errorf("failed to calculate cutoff: %w", err)
dealStartCutoff := getDealStartCutoff(s)
if dealStartCutoff <= ts.Height() {
return sealiface.PreCommitBatchRes{}, xerrors.Errorf("cutoff has already passed (cutoff %d <= curEpoch %d)", dealStartCutoff, ts.Height())
}

// Allocation cutoff is a soft deadline, so don't fail if we've passed it.
allocationCutoff := b.getAllocationCutoff(s)

var cutoffEpoch abi.ChainEpoch
if dealStartCutoff < allocationCutoff {
cutoffEpoch = dealStartCutoff
} else {
cutoffEpoch = allocationCutoff
}

sn := s.SectorNumber

b.lk.Lock()
b.cutoffs[sn] = cutoff
b.cutoffs[sn] = time.Now().Add(time.Duration(cutoffEpoch-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second)
b.todo[sn] = &preCommitEntry{
deposit: deposit,
pci: in,
Expand Down Expand Up @@ -471,8 +483,7 @@ func (b *PreCommitBatcher) Stop(ctx context.Context) error {
}
}

// TODO: If this returned epochs, it would make testing much easier
func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) (time.Time, error) {
func getDealStartCutoff(si SectorInfo) abi.ChainEpoch {
cutoffEpoch := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
Expand All @@ -485,9 +496,24 @@ func getPreCommitCutoff(curEpoch abi.ChainEpoch, si SectorInfo) (time.Time, erro
}
}

if cutoffEpoch <= curEpoch {
return time.Now(), xerrors.Errorf("cutoff has already passed (cutoff %d <= curEpoch %d)", cutoffEpoch, curEpoch)
}
return cutoffEpoch
}

func (b *PreCommitBatcher) getAllocationCutoff(si SectorInfo) abi.ChainEpoch {
cutoff := si.TicketEpoch + policy.MaxPreCommitRandomnessLookback
for _, p := range si.Pieces {
if p.DealInfo == nil {
continue
}

return time.Now().Add(time.Duration(cutoffEpoch-curEpoch) * time.Duration(build.BlockDelaySecs) * time.Second), nil
alloc, _ := b.api.StateGetAllocationForPendingDeal(b.mctx, p.DealInfo.DealID, types.EmptyTSK)
// alloc is nil if this is not a verified deal in nv17 or later
if alloc == nil {
continue
}
if alloc.Expiration < cutoff {
cutoff = alloc.Expiration
}
}
return cutoff
}

0 comments on commit a0771b1

Please sign in to comment.