Skip to content

Commit

Permalink
fix flaky TestDealPublisher and re-enable
Browse files Browse the repository at this point in the history
fixes #6799
  • Loading branch information
Stebalien committed Aug 6, 2021
1 parent 9dca656 commit be2ecf6
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 14 deletions.
13 changes: 7 additions & 6 deletions markets/storageadapter/dealpublisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
Expand Down Expand Up @@ -203,9 +204,9 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)",
pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg)

// If the maximum number of deals per message has been reached,
// send a publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg {
// If the maximum number of deals per message has been reached or we're not batching, send a
// publish message
if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg || p.publishPeriod == 0 {
log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg)
p.publishAllDeals()
return
Expand All @@ -218,7 +219,7 @@ func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) {
func (p *DealPublisher) waitForMoreDeals() {
// Check if we're already waiting for deals
if !p.publishPeriodStart.IsZero() {
elapsed := time.Since(p.publishPeriodStart)
elapsed := build.Clock.Since(p.publishPeriodStart)
log.Infof("%s elapsed of / %s until publish deals queue is published",
elapsed, p.publishPeriod)
return
Expand All @@ -227,11 +228,11 @@ func (p *DealPublisher) waitForMoreDeals() {
// Set a timeout to wait for more deals to arrive
log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod)
ctx, cancel := context.WithCancel(p.ctx)
p.publishPeriodStart = time.Now()
p.publishPeriodStart = build.Clock.Now()
p.cancelWaitForMoreDeals = cancel

go func() {
timer := time.NewTimer(p.publishPeriod)
timer := build.Clock.Timer(p.publishPeriod)
select {
case <-ctx.Done():
timer.Stop()
Expand Down
54 changes: 49 additions & 5 deletions markets/storageadapter/dealpublisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"github.com/filecoin-project/go-state-types/crypto"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"

"github.com/stretchr/testify/require"

tutils "github.com/filecoin-project/specs-actors/v2/support/testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -25,7 +27,11 @@ import (
)

func TestDealPublisher(t *testing.T) {
t.Skip("this test randomly fails in various subtests; see issue #6799")
oldClock := build.Clock
t.Cleanup(func() { build.Clock = oldClock })
mc := clock.NewMock()
build.Clock = mc

testCases := []struct {
name string
publishPeriod time.Duration
Expand Down Expand Up @@ -92,6 +98,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mc.Set(time.Now())
dpapi := newDPAPI(t)

// Create a deal publisher
Expand All @@ -116,14 +123,51 @@ func TestDealPublisher(t *testing.T) {
}

// Wait until publish period has elapsed
time.Sleep(2 * tc.publishPeriod)
if tc.publishPeriod > 0 {
// If we expect deals to get stuck in the queue, wait until that happens
if tc.maxDealsPerMsg != 0 && tc.dealCountWithinPublishPeriod%int(tc.maxDealsPerMsg) != 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
return !dp.publishPeriodStart.IsZero()
}, time.Second, time.Millisecond, "failed to queue deals")
}

// Then wait to send
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()

// Advance if necessary.
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}

return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}

// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
}

if tc.publishPeriod > 0 && tc.dealCountAfterPublishPeriod > 0 {
require.Eventually(t, func() bool {
dp.lk.Lock()
defer dp.lk.Unlock()
if mc.Since(dp.publishPeriodStart) <= tc.publishPeriod {
dp.lk.Unlock()
mc.Set(dp.publishPeriodStart.Add(tc.publishPeriod + 1))
dp.lk.Lock()
}
return len(dp.pending) == 0
}, time.Second, time.Millisecond, "failed to send pending messages")
}

checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
})
}
Expand All @@ -133,7 +177,7 @@ func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)

// Create a deal publisher
start := time.Now()
start := build.Clock.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, nil, PublishMsgConfig{
Period: publishPeriod,
Expand All @@ -152,15 +196,15 @@ func TestForcePublish(t *testing.T) {
dealsToPublish = append(dealsToPublish, deal)

// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
build.Clock.Sleep(10 * time.Millisecond)

// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
require.True(t, pendingInfo.PublishPeriodStart.Before(build.Clock.Now()))

// Force publish all pending deals
dp.ForcePublishPendingDeals()
Expand Down
6 changes: 3 additions & 3 deletions markets/storageadapter/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagema
}

p, offset, err := n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
curTime := time.Now()
for time.Since(curTime) < addPieceRetryTimeout {
curTime := build.Clock.Now()
for build.Clock.Since(curTime) < addPieceRetryTimeout {
if !xerrors.Is(err, sealing.ErrTooManySectorsSealing) {
if err != nil {
log.Errorf("failed to addPiece for deal %d, err: %v", deal.DealID, err)
}
break
}
select {
case <-time.After(addPieceRetryWait):
case <-build.Clock.After(addPieceRetryWait):
p, offset, err = n.secb.AddPiece(ctx, pieceSize, pieceData, sdInfo)
case <-ctx.Done():
return nil, xerrors.New("context expired while waiting to retry AddPiece")
Expand Down

0 comments on commit be2ecf6

Please sign in to comment.