From d65467dd0c013201b4b666105df3b751b9d39ec5 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 8 Jan 2021 16:28:38 +0100 Subject: [PATCH] feat: batch publish deal messages --- api/test/deals.go | 300 +++++++++--------- api/test/test.go | 1 + markets/storageadapter/client.go | 8 +- markets/storageadapter/dealpublisher.go | 294 +++++++++++++++++ markets/storageadapter/dealpublisher_test.go | 227 +++++++++++++ markets/storageadapter/getcurrentdealinfo.go | 97 ++++-- .../storageadapter/getcurrentdealinfo_test.go | 193 +++++------ .../storageadapter/ondealsectorcommitted.go | 25 +- .../ondealsectorcommitted_test.go | 95 ++++-- markets/storageadapter/provider.go | 54 +--- node/builder.go | 3 + node/config/def.go | 15 + node/node_test.go | 11 +- node/test/builder.go | 11 +- 14 files changed, 988 insertions(+), 346 deletions(-) create mode 100644 markets/storageadapter/dealpublisher.go create mode 100644 markets/storageadapter/dealpublisher_test.go diff --git a/api/test/deals.go b/api/test/deals.go index 1189f070e14..10b761a2159 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -8,103 +8,45 @@ import ( "math/rand" "os" "path/filepath" - "sync/atomic" "testing" "time" - "github.com/filecoin-project/go-state-types/abi" - - "github.com/stretchr/testify/require" - "github.com/ipfs/go-cid" files "github.com/ipfs/go-ipfs-files" "github.com/ipld/go-car" + "github.com/stretchr/testify/require" "github.com/filecoin-project/go-fil-markets/storagemarket" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/markets/storageadapter" + "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/impl" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" dstest "github.com/ipfs/go-merkledag/test" unixfile "github.com/ipfs/go-unixfs/file" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/node/impl" - ipld "github.com/ipfs/go-ipld-format" ) func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) { + s := setupOneClientOneMiner(t, b, blocktime) + defer s.blockMiner.Stop() - ctx := context.Background() - n, sn := b(t, OneFull, OneMiner) - client := n[0].FullNode.(*impl.FullNodeAPI) - miner := sn[0] - - addrinfo, err := client.NetAddrsListen(ctx) - if err != nil { - t.Fatal(err) - } - - if err := miner.NetConnect(ctx, addrinfo); err != nil { - t.Fatal(err) - } - time.Sleep(time.Second) - - mine := int64(1) - done := make(chan struct{}) - go func() { - defer close(done) - for atomic.LoadInt64(&mine) == 1 { - time.Sleep(blocktime) - if err := sn[0].MineOne(ctx, MineNext); err != nil { - t.Error(err) - } - } - }() - - MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch) - - atomic.AddInt64(&mine, -1) - fmt.Println("shutting down mining") - <-done + MakeDeal(t, s.ctx, 6, s.client, s.miner, carExport, fastRet, startEpoch) } func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + s := setupOneClientOneMiner(t, b, blocktime) + defer s.blockMiner.Stop() - ctx := context.Background() - n, sn := b(t, OneFull, OneMiner) - client := n[0].FullNode.(*impl.FullNodeAPI) - miner := sn[0] - - addrinfo, err := client.NetAddrsListen(ctx) - if err != nil { - t.Fatal(err) - } - - if err := miner.NetConnect(ctx, addrinfo); err != nil { - t.Fatal(err) - } - time.Sleep(time.Second) - - mine := int64(1) - done := make(chan struct{}) - - go func() { - defer close(done) - for atomic.LoadInt64(&mine) == 1 { - time.Sleep(blocktime) - if err := sn[0].MineOne(ctx, MineNext); err != nil { - t.Error(err) - } - } - }() - - MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch) - MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch) - - atomic.AddInt64(&mine, -1) - fmt.Println("shutting down mining") - <-done + MakeDeal(t, s.ctx, 6, s.client, s.miner, false, false, startEpoch) + MakeDeal(t, s.ctx, 7, s.client, s.miner, false, false, startEpoch) } func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) { @@ -151,96 +93,133 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api return res, data, nil } -func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { - - ctx := context.Background() - n, sn := b(t, OneFull, OneMiner) +func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + publishPeriod := 10 * time.Second + maxDealsPerMsg := uint64(2) + + // Set max deals per publish deals message to 2 + minerDef := []StorageMiner{{ + Full: 0, + Opts: node.Override( + new(*storageadapter.DealPublisher), + storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{ + PublishPeriod: config.Duration(publishPeriod), + MaxDealsPerMsg: maxDealsPerMsg, + })), + Preseal: PresealGenesis, + }} + + // Create a connect client and miner node + n, sn := b(t, OneFull, minerDef) client := n[0].FullNode.(*impl.FullNodeAPI) miner := sn[0] + s := connectAndStartMining(t, b, blocktime, client, miner) + defer s.blockMiner.Stop() - addrinfo, err := client.NetAddrsListen(ctx) - if err != nil { - t.Fatal(err) + // Starts a deal and waits until it's published + runDealTillPublish := func(rseed int) { + res, _, err := CreateClientFile(s.ctx, s.client, rseed) + require.NoError(t, err) + + upds, err := client.ClientGetDealUpdates(s.ctx) + require.NoError(t, err) + + startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) + + // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this + time.Sleep(time.Second) + + done := make(chan struct{}) + go func() { + for upd := range upds { + if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit { + done <- struct{}{} + } + } + }() + <-done } - if err := miner.NetConnect(ctx, addrinfo); err != nil { - t.Fatal(err) + // Run three deals in parallel + done := make(chan struct{}, maxDealsPerMsg+1) + for rseed := 1; rseed <= 3; rseed++ { + rseed := rseed + go func() { + runDealTillPublish(rseed) + done <- struct{}{} + }() } - time.Sleep(time.Second) - mine := int64(1) - done := make(chan struct{}) - go func() { - defer close(done) - for atomic.LoadInt64(&mine) == 1 { - time.Sleep(blocktime) - if err := sn[0].MineOne(ctx, MineNext); err != nil { - t.Error(err) - } + // Wait for two of the deals to be published + for i := 0; i < int(maxDealsPerMsg); i++ { + <-done + } + + // Expect a single PublishStorageDeals message that includes the first two deals + msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1) + require.NoError(t, err) + count := 0 + for _, msgCid := range msgCids { + msg, err := s.client.ChainGetMessage(s.ctx, msgCid) + require.NoError(t, err) + + if msg.Method == market.Methods.PublishStorageDeals { + count++ + var pubDealsParams market2.PublishStorageDealsParams + err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg)) } - }() + } + require.Equal(t, 1, count) + + // The third deal should be published once the publish period expires. + // Allow a little padding as it takes a moment for the state change to + // be noticed by the client. + padding := 10 * time.Second + select { + case <-time.After(publishPeriod + padding): + require.Fail(t, "Expected 3rd deal to be published once publish period elapsed") + case <-done: // Success + } +} + +func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + s := setupOneClientOneMiner(t, b, blocktime) + defer s.blockMiner.Stop() data := make([]byte, 1600) rand.New(rand.NewSource(int64(8))).Read(data) r := bytes.NewReader(data) - fcid, err := client.ClientImportLocal(ctx, r) + fcid, err := s.client.ClientImportLocal(s.ctx, r) if err != nil { t.Fatal(err) } fmt.Println("FILE CID: ", fcid) - deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch) + deal := startDeal(t, s.ctx, s.miner, s.client, fcid, true, startEpoch) - waitDealPublished(t, ctx, miner, deal) + waitDealPublished(t, s.ctx, s.miner, deal) fmt.Println("deal published, retrieving") // Retrieval - info, err := client.ClientGetDealInfo(ctx, *deal) + info, err := s.client.ClientGetDealInfo(s.ctx, *deal) require.NoError(t, err) - testRetrieval(t, ctx, client, fcid, &info.PieceCID, false, data) - atomic.AddInt64(&mine, -1) - fmt.Println("shutting down mining") - <-done + testRetrieval(t, s.ctx, s.client, fcid, &info.PieceCID, false, data) } func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) { - - ctx := context.Background() - n, sn := b(t, OneFull, OneMiner) - client := n[0].FullNode.(*impl.FullNodeAPI) - miner := sn[0] - - addrinfo, err := client.NetAddrsListen(ctx) - if err != nil { - t.Fatal(err) - } - - if err := miner.NetConnect(ctx, addrinfo); err != nil { - t.Fatal(err) - } - time.Sleep(time.Second) - - mine := int64(1) - done := make(chan struct{}) - - go func() { - defer close(done) - for atomic.LoadInt64(&mine) == 1 { - time.Sleep(blocktime) - if err := sn[0].MineOne(ctx, MineNext); err != nil { - t.Error(err) - } - } - }() + s := setupOneClientOneMiner(t, b, blocktime) + defer s.blockMiner.Stop() { data1 := make([]byte, 800) rand.New(rand.NewSource(int64(3))).Read(data1) r := bytes.NewReader(data1) - fcid1, err := client.ClientImportLocal(ctx, r) + fcid1, err := s.client.ClientImportLocal(s.ctx, r) if err != nil { t.Fatal(err) } @@ -249,35 +228,31 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration rand.New(rand.NewSource(int64(9))).Read(data2) r2 := bytes.NewReader(data2) - fcid2, err := client.ClientImportLocal(ctx, r2) + fcid2, err := s.client.ClientImportLocal(s.ctx, r2) if err != nil { t.Fatal(err) } - deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0) + deal1 := startDeal(t, s.ctx, s.miner, s.client, fcid1, true, 0) // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this time.Sleep(time.Second) - waitDealSealed(t, ctx, miner, client, deal1, true) + waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true) - deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0) + deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0) time.Sleep(time.Second) - waitDealSealed(t, ctx, miner, client, deal2, false) + waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false) // Retrieval - info, err := client.ClientGetDealInfo(ctx, *deal2) + info, err := s.client.ClientGetDealInfo(s.ctx, *deal2) require.NoError(t, err) - rf, _ := miner.SectorsRefs(ctx) + rf, _ := s.miner.SectorsRefs(s.ctx) fmt.Printf("refs: %+v\n", rf) - testRetrieval(t, ctx, client, fcid2, &info.PieceCID, false, data2) + testRetrieval(t, s.ctx, s.client, fcid2, &info.PieceCID, false, data2) } - - atomic.AddInt64(&mine, -1) - fmt.Println("shutting down mining") - <-done } func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid { @@ -459,3 +434,40 @@ func extractCarData(t *testing.T, ctx context.Context, rdata []byte, rpath strin } return rdata } + +type dealsScaffold struct { + ctx context.Context + client *impl.FullNodeAPI + miner TestStorageNode + blockMiner *BlockMiner +} + +func setupOneClientOneMiner(t *testing.T, b APIBuilder, blocktime time.Duration) *dealsScaffold { + n, sn := b(t, OneFull, OneMiner) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + return connectAndStartMining(t, b, blocktime, client, miner) +} + +func connectAndStartMining(t *testing.T, b APIBuilder, blocktime time.Duration, client *impl.FullNodeAPI, miner TestStorageNode) *dealsScaffold { + ctx := context.Background() + addrinfo, err := client.NetAddrsListen(ctx) + if err != nil { + t.Fatal(err) + } + + if err := miner.NetConnect(ctx, addrinfo); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + blockMiner := NewBlockMiner(ctx, t, miner, blocktime) + blockMiner.MineBlocks() + + return &dealsScaffold{ + ctx: ctx, + client: client, + miner: miner, + blockMiner: blockMiner, + } +} diff --git a/api/test/test.go b/api/test/test.go index a1b82c590a8..89c0a0f80bd 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -57,6 +57,7 @@ const GenesisPreseals = 2 // Options for setting up a mock storage miner type StorageMiner struct { Full int + Opts node.Option Preseal int } diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 88a50931a18..6c6f7c3dd0a 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -136,6 +136,7 @@ func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address // ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal // returns the Deal id if there is no error +// TODO: Don't return deal ID func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) { log.Infow("DEAL ACCEPTED!") @@ -217,14 +218,17 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil } +// TODO: Remove dealID parameter func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { - return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb) + return OnDealSectorPreCommitted(ctx, c, c.ev, provider, marketactor.DealProposal(proposal), publishCid, cb) } +// TODO: Remove dealID parameter func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { - return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb) + return OnDealSectorCommitted(ctx, c, c.ev, provider, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb) } +// TODO: Replace dealID parameter with DealProposal func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { head, err := c.ChainHead(ctx) if err != nil { diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go new file mode 100644 index 00000000000..0d1e6165539 --- /dev/null +++ b/markets/storageadapter/dealpublisher.go @@ -0,0 +1,294 @@ +package storageadapter + +import ( + "context" + "strings" + "sync" + "time" + + "go.uber.org/fx" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/node/config" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type dealPublisherAPI interface { + MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) +} + +// DealPublisher batches deal publishing so that many deals can be included in +// a single publish message. This saves gas for miners that publish deals +// frequently. +// When a deal is submitted, the DealPublisher waits a configurable amount of +// time for other deals to be submitted before sending the publish message. +// There is a configurable maximum number of deals that can be included in one +// message. When the limit is reached the DealPublisher immediately submits a +// publish message with all deals in the queue. +type DealPublisher struct { + api dealPublisherAPI + + ctx context.Context + Shutdown context.CancelFunc + + maxDealsPerPublishMsg uint64 + publishPeriod time.Duration + publishSpec *api.MessageSendSpec + + lk sync.Mutex + pending []*pendingDeal + cancelWaitForMoreDeals context.CancelFunc + publishPeriodStart time.Time +} + +func NewDealPublisher( + feeConfig *config.MinerFeeConfig, + publishMsgCfg *config.PublishMsgConfig, +) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { + return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { + maxFee := abi.NewTokenAmount(0) + if feeConfig != nil { + maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee) + } + publishSpec := &api.MessageSendSpec{MaxFee: maxFee} + dp := newDealPublisher(full, publishMsgCfg, publishSpec) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + dp.Shutdown() + return nil + }, + }) + return dp + } +} + +func newDealPublisher( + dpapi dealPublisherAPI, + publishMsgCfg *config.PublishMsgConfig, + publishSpec *api.MessageSendSpec, +) *DealPublisher { + publishPeriod := time.Duration(0) + maxDealsPerMsg := uint64(1) + if publishMsgCfg != nil { + publishPeriod = time.Duration(publishMsgCfg.PublishPeriod) + maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg + } + ctx, cancel := context.WithCancel(context.Background()) + return &DealPublisher{ + api: dpapi, + ctx: ctx, + Shutdown: cancel, + maxDealsPerPublishMsg: maxDealsPerMsg, + publishPeriod: publishPeriod, + publishSpec: publishSpec, + } +} + +func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) { + pdeal := newPendingDeal(ctx, deal) + + // Add the deal to the queue + p.processNewDeal(pdeal) + + // Wait for the deal to be submitted + select { + case <-ctx.Done(): + return cid.Undef, ctx.Err() + case res := <-pdeal.Result: + return res.msgCid, res.err + } +} + +func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { + p.lk.Lock() + defer p.lk.Unlock() + + // Filter out any cancelled deals + p.filterCancelledDeals() + + // If all deals have been cancelled, clear the wait-for-deals timer + if len(p.pending) == 0 && p.cancelWaitForMoreDeals != nil { + p.cancelWaitForMoreDeals() + p.cancelWaitForMoreDeals = nil + } + + // Make sure the new deal hasn't been cancelled + if pdeal.ctx.Err() != nil { + return + } + + // Add the new deal to the queue + p.pending = append(p.pending, pdeal) + log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)", + pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg) + + // If the maximum number of deals per message has been reached, + // send a publish message + if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg { + log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg) + p.publishAllDeals() + return + } + + // Otherwise wait for more deals to arrive or the timeout to be reached + p.waitForMoreDeals() +} + +func (p *DealPublisher) waitForMoreDeals() { + // Check if we're already waiting for deals + if !p.publishPeriodStart.IsZero() { + elapsed := time.Since(p.publishPeriodStart) + log.Infof("%s elapsed of / %s until publish deals queue is published", + elapsed, p.publishPeriod) + return + } + + // Set a timeout to wait for more deals to arrive + log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod) + ctx, cancel := context.WithCancel(p.ctx) + p.publishPeriodStart = time.Now() + p.cancelWaitForMoreDeals = cancel + + go func() { + timer := time.NewTimer(p.publishPeriod) + select { + case <-ctx.Done(): + timer.Stop() + case <-timer.C: + p.lk.Lock() + defer p.lk.Unlock() + + // The timeout has expired so publish all pending deals + log.Infof("publish deals queue period of %s has expired, publishing deals", p.publishPeriod) + p.publishAllDeals() + } + }() +} + +func (p *DealPublisher) publishAllDeals() { + // If the timeout hasn't yet been cancelled, cancel it + if p.cancelWaitForMoreDeals != nil { + p.cancelWaitForMoreDeals() + p.cancelWaitForMoreDeals = nil + p.publishPeriodStart = time.Time{} + } + + // Filter out any deals that have been cancelled + p.filterCancelledDeals() + deals := p.pending[:] + p.pending = nil + + // Send the publish message + go p.publishReady(deals) +} + +func (p *DealPublisher) publishReady(ready []*pendingDeal) { + if len(ready) == 0 { + return + } + + deals := make([]market2.ClientDealProposal, 0, len(ready)) + for _, pd := range ready { + deals = append(deals, pd.deal) + } + + // Send the publish message + msgCid, err := p.publishDealProposals(deals) + + // Signal that each deal has been published + for _, pd := range ready { + pd := pd + go func() { + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case pd.Result <- res: + } + }() + } +} + +// Sends the publish message +func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { + log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) + + provider := deals[0].Proposal.Provider + mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) + if err != nil { + return cid.Undef, err + } + + params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ + Deals: deals, + }) + + if err != nil { + return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) + } + + smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{ + To: market.Address, + From: mi.Worker, + Value: types.NewInt(0), + Method: market.Methods.PublishStorageDeals, + Params: params, + }, p.publishSpec) + + if err != nil { + return cid.Undef, err + } + return smsg.Cid(), nil +} + +func pieceCids(deals []market2.ClientDealProposal) string { + cids := make([]string, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID.String()) + } + return strings.Join(cids, ", ") +} + +// filter out deals that have been cancelled +func (p *DealPublisher) filterCancelledDeals() { + i := 0 + for _, pd := range p.pending { + if pd.ctx.Err() == nil { + p.pending[i] = pd + i++ + } + } + p.pending = p.pending[:i] +} + +type publishResult struct { + msgCid cid.Cid + err error +} + +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go new file mode 100644 index 00000000000..77e8e65f84e --- /dev/null +++ b/markets/storageadapter/dealpublisher_test.go @@ -0,0 +1,227 @@ +package storageadapter + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/crypto" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + + "github.com/stretchr/testify/require" + + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/config" +) + +func TestDealPublisher(t *testing.T) { + testCases := []struct { + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + expiredWithinPublishPeriod int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int + }{{ + name: "publish one deal within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{1}, + }, { + name: "publish two deals within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{2}, + }, { + name: "publish one deal within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{1, 1}, + }, { + name: "publish deals that exceed max deals per message within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 2, + dealCountWithinPublishPeriod: 3, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1, 1}, + }, { + name: "ignore expired deals", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + expiredWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, + }} + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + client := tutils.NewActorAddr(t, "client") + provider := tutils.NewActorAddr(t, "provider") + worker := tutils.NewActorAddr(t, "worker") + dpapi := newDPAPI(t, worker) + + // Create a deal publisher + dp := newDealPublisher(dpapi, &config.PublishMsgConfig{ + PublishPeriod: config.Duration(tc.publishPeriod), + MaxDealsPerMsg: tc.maxDealsPerMsg, + }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) + + // Keep a record of the deals that were submitted to be published + var dealsToPublish []market.ClientDealProposal + publishDeal := func(expired bool) { + pctx := ctx + var cancel context.CancelFunc + if expired { + pctx, cancel = context.WithCancel(ctx) + cancel() + } + + deal := market.ClientDealProposal{ + Proposal: market0.DealProposal{ + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, + }, + ClientSignature: crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte("signature data"), + }, + } + if !expired { + dealsToPublish = append(dealsToPublish, deal) + } + go func() { + _, err := dp.Publish(pctx, deal) + if expired { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }() + } + + // Publish deals within publish period + for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { + publishDeal(false) + } + for i := 0; i < tc.expiredWithinPublishPeriod; i++ { + publishDeal(true) + } + + // Wait until publish period has elapsed + time.Sleep(2 * tc.publishPeriod) + + // Publish deals after publish period + for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { + publishDeal(false) + } + + // For each message that was expected to be sent + var publishedDeals []market.ClientDealProposal + for _, expectedDealsInMsg := range tc.expectedDealsPerMsg { + // Should have called StateMinerInfo with the provider address + stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls + require.Equal(t, provider, stateMinerInfoAddr) + + // Check the fields of the message that was sent + msg := <-dpapi.pushedMsgs + require.Equal(t, worker, msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, market.Methods.PublishStorageDeals, msg.Method) + + // Check that the expected number of deals was included in the message + var params market2.PublishStorageDealsParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, params.Deals, expectedDealsInMsg) + + // Keep track of the deals that were sent + for _, d := range params.Deals { + publishedDeals = append(publishedDeals, d) + } + } + + // Verify that all deals that were submitted to be published were + // sent out (we do this by ensuring all the piece CIDs are present) + require.True(t, matchPieceCids(publishedDeals, dealsToPublish)) + }) + } +} + +func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { + cidsA := dealPieceCids(sent) + cidsB := dealPieceCids(exp) + + if len(cidsA) != len(cidsB) { + return false + } + + s1 := cid.NewSet() + for _, c := range cidsA { + s1.Add(c) + } + + for _, c := range cidsB { + if !s1.Has(c) { + return false + } + } + + return true +} + +func dealPieceCids(deals []market2.ClientDealProposal) []cid.Cid { + cids := make([]cid.Cid, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID) + } + return cids +} + +type dpAPI struct { + t *testing.T + worker address.Address + + stateMinerInfoCalls chan address.Address + pushedMsgs chan *types.Message +} + +func newDPAPI(t *testing.T, worker address.Address) *dpAPI { + return &dpAPI{ + t: t, + worker: worker, + stateMinerInfoCalls: make(chan address.Address, 128), + pushedMsgs: make(chan *types.Message, 128), + } +} + +func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { + d.stateMinerInfoCalls <- address + return miner.MinerInfo{Worker: d.worker}, nil +} + +func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { + d.pushedMsgs <- msg + return &types.SignedMessage{Message: *msg}, nil +} diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index ab8c3f52fc6..79fc9be74c2 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -8,72 +8,107 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/ipfs/go-cid" "golang.org/x/xerrors" ) type getCurrentDealInfoAPI interface { + ChainGetMessage(ctx context.Context, mc cid.Cid) (*types.Message, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) - StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) + StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) } -// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed -func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) { - marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) - if dealErr == nil { +// GetCurrentDealInfo gets the current deal state and deal ID. +// Note that the deal ID is assigned when the deal is published, so it may +// have changed if there was a reorg after the deal was published. +func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) { + if publishCid == nil { + return abi.DealID(0), nil, xerrors.Errorf("could not get deal info for nil publish deals CID") + } + + // Lookup the deal ID by comparing the deal proposal to the proposals in + // the publish deals message, and indexing into the message return value + dealID, err := dealIDFromPublishDealsMsg(ctx, ts, api, proposal, *publishCid) + if err != nil { + return dealID, nil, err + } + + // Lookup the deal state by deal ID + marketDeal, err := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) + if err == nil { + // Make sure the retrieved deal proposal matches the target proposal equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { return dealID, nil, err } - if equal { - return dealID, marketDeal, nil + if !equal { + return dealID, nil, xerrors.Errorf("Deal proposals did not match") } - dealErr = xerrors.Errorf("Deal proposals did not match") - } - if publishCid == nil { - return dealID, nil, dealErr } - // attempt deal id correction - lookup, err := api.StateSearchMsg(ctx, *publishCid) + return dealID, marketDeal, err +} + +// findDealID looks up the publish deals message by cid, and finds the deal ID +// by looking at the message return value +func dealIDFromPublishDealsMsg(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, proposal market.DealProposal, publishCid cid.Cid) (abi.DealID, error) { + dealID := abi.DealID(0) + + // Get the return value of the publish deals message + lookup, err := api.StateWaitMsg(ctx, publishCid, build.MessageConfidence) if err != nil { - return dealID, nil, err + return dealID, xerrors.Errorf("looking for publish deal message %s: search msg failed: %w", publishCid, err) } if lookup.Receipt.ExitCode != exitcode.Ok { - return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) + return dealID, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", publishCid, lookup.Receipt.ExitCode) } var retval market.PublishStorageDealsReturn if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + return dealID, xerrors.Errorf("looking for publish deal message %s: unmarshalling message return: %w", publishCid, err) } - if len(retval.IDs) != 1 { - // market currently only ever sends messages with 1 deal - return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + // Get the parameters to the publish deals message + publishCid = lookup.Message + pubmsg, err := api.ChainGetMessage(ctx, publishCid) + if err != nil { + return dealID, xerrors.Errorf("getting publish deal message %s: %w", publishCid, err) } - if retval.IDs[0] == dealID { - // DealID did not change, so we are stuck with the original lookup error - return dealID, nil, dealErr + var pubDealsParams market2.PublishStorageDealsParams + if err := pubDealsParams.UnmarshalCBOR(bytes.NewReader(pubmsg.Params)); err != nil { + return dealID, xerrors.Errorf("unmarshalling publish deal message params for message %s: %w", publishCid, err) } - dealID = retval.IDs[0] - marketDeal, err = api.StateMarketStorageDeal(ctx, dealID, ts.Key()) - - if err == nil { - equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) + // Scan through the deal proposals in the message parameters to find the + // index of the target deal proposal + dealIdx := -1 + for i, paramDeal := range pubDealsParams.Deals { + eq, err := checkDealEquality(ctx, ts, api, proposal, market.DealProposal(paramDeal.Proposal)) if err != nil { - return dealID, nil, err + return dealID, xerrors.Errorf("comparing publish deal message %s proposal to deal proposal: %w", publishCid, err) } - if !equal { - return dealID, nil, xerrors.Errorf("Deal proposals did not match") + if eq { + dealIdx = i + break } } - return dealID, marketDeal, err + + if dealIdx == -1 { + return dealID, xerrors.Errorf("could not find deal in publish deals message %s", publishCid) + } + + if dealIdx >= len(retval.IDs) { + return dealID, xerrors.Errorf("deal index %d out of bounds of deals (len %d) in publish deals message %s", + dealIdx, len(retval.IDs), publishCid) + } + + return retval.IDs[dealIdx], nil } func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) { diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go index ed5d36c5b8c..62c82d75a2e 100644 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ b/markets/storageadapter/getcurrentdealinfo_test.go @@ -4,20 +4,25 @@ import ( "bytes" "errors" "math/rand" + "sort" "testing" "time" + "golang.org/x/net/context" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" test "github.com/filecoin-project/lotus/chain/events/state/mock" "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" - "golang.org/x/net/context" - "golang.org/x/xerrors" ) var errNotFound = errors.New("Could not find") @@ -25,20 +30,29 @@ var errNotFound = errors.New("Could not find") func TestGetCurrentDealInfo(t *testing.T) { ctx := context.Background() dummyCid, _ := cid.Parse("bafkqaaa") - startDealID := abi.DealID(rand.Uint64()) - newDealID := abi.DealID(rand.Uint64()) - twoValuesReturn := makePublishDealsReturnBytes(t, []abi.DealID{abi.DealID(rand.Uint64()), abi.DealID(rand.Uint64())}) - sameValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{startDealID}) - newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) + dummyCid2, _ := cid.Parse("bafkqaab") + zeroDealID := abi.DealID(0) + earlierDealID := abi.DealID(9) + successDealID := abi.DealID(10) proposal := market.DealProposal{ - PieceCID: dummyCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", + PieceCID: dummyCid, + PieceSize: abi.PaddedPieceSize(100), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", } otherProposal := market.DealProposal{ - PieceCID: dummyCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "other", + PieceCID: dummyCid2, + PieceSize: abi.PaddedPieceSize(100), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "other", } successDeal := &api.MarketDeal{ Proposal: proposal, @@ -47,7 +61,7 @@ func TestGetCurrentDealInfo(t *testing.T) { LastUpdatedEpoch: 2, }, } - otherDeal := &api.MarketDeal{ + earlierDeal := &api.MarketDeal{ Proposal: otherProposal, State: market.DealState{ SectorStartEpoch: 1, @@ -64,125 +78,89 @@ func TestGetCurrentDealInfo(t *testing.T) { expectedError error }{ "deal lookup succeeds": { - marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, - }, - expectedDealID: startDealID, - expectedMarketDeal: successDeal, - }, - "publish CID = nil": { - expectedDealID: startDealID, - expectedError: errNotFound, - }, - "publish CID = nil, other deal on lookup": { - marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: otherDeal, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("Deal proposals did not match"), - }, - "search message fails": { - publishCid: &dummyCid, - searchMessageErr: errors.New("something went wrong"), - expectedDealID: startDealID, - expectedError: errors.New("something went wrong"), - }, - "return code not ok": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.ErrIllegalState, - }, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState), - }, - "unable to unmarshal params": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: []byte("applesauce"), + Return: makePublishDealsReturnBytes(t, []abi.DealID{successDealID}), }, }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("looking for publish deal message: unmarshaling message return: cbor input should be of type array"), - }, - "more than one returned id": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: twoValuesReturn, - }, + marketDeals: map[abi.DealID]*api.MarketDeal{ + successDealID: successDeal, }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal"), + expectedDealID: successDealID, + expectedMarketDeal: successDeal, }, - "deal ids still match": { + "deal lookup succeeds two return values": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: sameValueReturn, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID, successDealID}), }, }, - expectedDealID: startDealID, - expectedError: errNotFound, + marketDeals: map[abi.DealID]*api.MarketDeal{ + earlierDealID: earlierDeal, + successDealID: successDeal, + }, + expectedDealID: successDealID, + expectedMarketDeal: successDeal, }, - "new deal id success": { + "deal lookup fails missing proposal": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: newValueReturn, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID}), }, }, marketDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: successDeal, + earlierDealID: earlierDeal, }, - expectedDealID: newDealID, - expectedMarketDeal: successDeal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("could not find deal in publish deals message %s", dummyCid), }, - "new deal id after other deal found": { + "deal lookup fails mismatch count of deals and return values": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: newValueReturn, + Return: makePublishDealsReturnBytes(t, []abi.DealID{earlierDealID}), }, }, marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: otherDeal, - newDealID: successDeal, + earlierDealID: earlierDeal, + successDealID: successDeal, }, - expectedDealID: newDealID, - expectedMarketDeal: successDeal, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("deal index 1 out of bounds of deals (len 1) in publish deals message %s", dummyCid), + }, + "search message fails": { + publishCid: &dummyCid, + searchMessageErr: errors.New("something went wrong"), + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: search msg failed: something went wrong", dummyCid), }, - "new deal id failure": { + "return code not ok": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, + ExitCode: exitcode.ErrIllegalState, }, }, - expectedDealID: newDealID, - expectedError: errNotFound, + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState), }, - "new deal id, failure due to other deal present": { + "unable to unmarshal params": { publishCid: &dummyCid, searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: newValueReturn, + Return: []byte("applesauce"), }, }, - marketDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: otherDeal, - }, - expectedDealID: newDealID, - expectedError: xerrors.Errorf("Deal proposals did not match"), + expectedDealID: zeroDealID, + expectedError: xerrors.Errorf("looking for publish deal message %s: unmarshalling message return: cbor input should be of type array", dummyCid), }, } runTestCase := func(testCase string, data struct { @@ -209,7 +187,7 @@ func TestGetCurrentDealInfo(t *testing.T) { MarketDeals: marketDeals, } - dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) + dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, proposal, data.publishCid) require.Equal(t, data.expectedDealID, dealID) require.Equal(t, data.expectedMarketDeal, marketDeal) if data.expectedError == nil { @@ -236,6 +214,33 @@ type mockGetCurrentDealInfoAPI struct { MarketDeals map[marketDealKey]*api.MarketDeal } +func (mapi *mockGetCurrentDealInfoAPI) ChainGetMessage(ctx context.Context, c cid.Cid) (*types.Message, error) { + var dealIDs []abi.DealID + var deals []market2.ClientDealProposal + for k, dl := range mapi.MarketDeals { + dealIDs = append(dealIDs, k.DealID) + deals = append(deals, market2.ClientDealProposal{ + Proposal: market2.DealProposal(dl.Proposal), + ClientSignature: crypto.Signature{ + Data: []byte("foo bar cat dog"), + Type: crypto.SigTypeBLS, + }, + }) + } + sort.SliceStable(deals, func(i, j int) bool { + return dealIDs[i] < dealIDs[j] + }) + buf := new(bytes.Buffer) + params := market2.PublishStorageDealsParams{Deals: deals} + err := params.MarshalCBOR(buf) + if err != nil { + panic(err) + } + return &types.Message{ + Params: buf.Bytes(), + }, nil +} + func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}] if !ok { @@ -244,8 +249,14 @@ func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Contex return deal, nil } -func (mapi *mockGetCurrentDealInfoAPI) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) { - return mapi.SearchMessageLookup, mapi.SearchMessageErr +func (mapi *mockGetCurrentDealInfoAPI) StateWaitMsg(ctx context.Context, c cid.Cid, confidence uint64) (*api.MsgLookup, error) { + if mapi.SearchMessageLookup == nil { + return mapi.SearchMessageLookup, mapi.SearchMessageErr + } + + lookup := *mapi.SearchMessageLookup + lookup.Message = c + return &lookup, mapi.SearchMessageErr } func (mapi *mockGetCurrentDealInfoAPI) StateLookupID(ctx context.Context, addr address.Address, ts types.TipSetKey) (address.Address, error) { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index bd59da7503f..51520f0116d 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -21,7 +21,7 @@ type sectorCommittedEventsAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error } -func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { +func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { // Ensure callback is only called once var once sync.Once cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) { @@ -32,7 +32,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -71,7 +71,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // If the deal hasn't been activated by the proposed start epoch, the // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + err = xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch) return false, err } @@ -86,9 +86,9 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return false, xerrors.Errorf("unmarshal pre commit: %w", err) } - // When the deal is published, the deal ID may change, so get the + // When there is a reorg, the deal ID may change, so get the // current deal ID from the publish message CID - dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + dealID, _, err := GetCurrentDealInfo(ctx, ts, api, proposal, publishCid) if err != nil { return false, err } @@ -119,7 +119,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return nil } -func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { +func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { // Ensure callback is only called once var once sync.Once cb := func(err error) { @@ -130,7 +130,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + isActive, err := checkIfDealAlreadyActive(ctx, api, ts, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -176,7 +176,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // If the deal hasn't been activated by the proposed start epoch, the // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + err := xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch) return false, err } @@ -186,7 +186,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // Get the deal info - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + dealID, sd, err := GetCurrentDealInfo(ctx, ts, api, proposal, publishCid) if err != nil { return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } @@ -216,11 +216,10 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { + dealID, sd, err := GetCurrentDealInfo(ctx, ts, api, proposal, publishCid) if err != nil { - // TODO: This may be fine for some errors - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return false, xerrors.Errorf("check if deal active: failed to look up deal on chain: %w", err) } // Sector with deal is already active diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index b74a1e53213..ef894d9505b 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/events" test "github.com/filecoin-project/lotus/chain/events/state/mock" "github.com/filecoin-project/lotus/chain/types" + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" ) @@ -34,12 +35,16 @@ func TestOnDealSectorPreCommitted(t *testing.T) { pieceCid := generateCids(1)[0] startDealID := abi.DealID(rand.Uint64()) newDealID := abi.DealID(rand.Uint64()) - newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) sectorNumber := abi.SectorNumber(rand.Uint64()) proposal := market.DealProposal{ - PieceCID: pieceCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", + PieceCID: pieceCid, + PieceSize: abi.PaddedPieceSize(rand.Uint64()), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", } unfinishedDeal := &api.MarketDeal{ Proposal: proposal, @@ -69,6 +74,12 @@ func TestOnDealSectorPreCommitted(t *testing.T) { } testCases := map[string]testCase{ "normal sequence": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, @@ -92,7 +103,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) { searchMessageLookup: &api.MsgLookup{ Receipt: types.MessageReceipt{ ExitCode: exitcode.Ok, - Return: newValueReturn, + Return: makePublishDealsReturnBytes(t, []abi.DealID{newDealID}), }, }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ @@ -115,6 +126,12 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBSectorNumber: sectorNumber, }, "ignores unsuccessful pre-commit message": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, @@ -137,9 +154,15 @@ func TestOnDealSectorPreCommitted(t *testing.T) { checkTsDeals: map[abi.DealID]*api.MarketDeal{}, searchMessageErr: errors.New("something went wrong"), expectedCBCallCount: 0, - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), + expectedError: xerrors.Errorf("failed to set up called handler: check if deal active: failed to look up deal on chain: looking for publish deal message %s: search msg failed: something went wrong", publishCid), }, "sector start epoch > 0 in check": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: successDeal, }, @@ -161,9 +184,8 @@ func TestOnDealSectorPreCommitted(t *testing.T) { deals: map[abi.DealID]*api.MarketDeal{}, }, }, - expectedCBCallCount: 1, - expectedCBError: errors.New("handling applied event: something went wrong"), - expectedError: errors.New("failed to set up called handler: something went wrong"), + expectedCBCallCount: 0, + expectedError: xerrors.Errorf("failed to set up called handler: check if deal active: failed to look up deal on chain: looking for publish deal message %s: search msg failed: something went wrong", publishCid), }, "proposed deal epoch timeout": { checkTsDeals: map[abi.DealID]*api.MarketDeal{ @@ -171,7 +193,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) { }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID), }, } runTestCase := func(testCase string, data testCase) { @@ -218,7 +240,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) { cbIsActive = isActive cbError = err } - err = OnDealSectorPreCommitted(ctx, api, eventsAPI, provider, startDealID, proposal, &publishCid, cb) + err = OnDealSectorPreCommitted(ctx, api, eventsAPI, provider, proposal, &publishCid, cb) if data.expectedError == nil { require.NoError(t, err) } else { @@ -249,9 +271,14 @@ func TestOnDealSectorCommitted(t *testing.T) { newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) sectorNumber := abi.SectorNumber(rand.Uint64()) proposal := market.DealProposal{ - PieceCID: pieceCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", + PieceCID: pieceCid, + PieceSize: abi.PaddedPieceSize(rand.Uint64()), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", } unfinishedDeal := &api.MarketDeal{ Proposal: proposal, @@ -279,6 +306,12 @@ func TestOnDealSectorCommitted(t *testing.T) { } testCases := map[string]testCase{ "normal sequence": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, @@ -317,6 +350,12 @@ func TestOnDealSectorCommitted(t *testing.T) { expectedCBCallCount: 1, }, "ignores unsuccessful prove-commit message": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, @@ -337,9 +376,15 @@ func TestOnDealSectorCommitted(t *testing.T) { checkTsDeals: map[abi.DealID]*api.MarketDeal{}, searchMessageErr: errors.New("something went wrong"), expectedCBCallCount: 0, - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), + expectedError: xerrors.Errorf("failed to set up called handler: check if deal active: failed to look up deal on chain: looking for publish deal message %s: search msg failed: something went wrong", publishCid), }, "sector start epoch > 0 in check": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: successDeal, }, @@ -347,6 +392,12 @@ func TestOnDealSectorCommitted(t *testing.T) { }, "error on deal in called": { searchMessageErr: errors.New("something went wrong"), + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, @@ -360,17 +411,21 @@ func TestOnDealSectorCommitted(t *testing.T) { }, }, }, - expectedCBCallCount: 1, - expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), + expectedError: xerrors.Errorf("failed to set up called handler: check if deal active: failed to look up deal on chain: looking for publish deal message %s: search msg failed: something went wrong", publishCid), }, "proposed deal epoch timeout": { + searchMessageLookup: &api.MsgLookup{ + Receipt: types.MessageReceipt{ + ExitCode: exitcode.Ok, + Return: makePublishDealsReturnBytes(t, []abi.DealID{startDealID}), + }, + }, checkTsDeals: map[abi.DealID]*api.MarketDeal{ startDealID: unfinishedDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID), }, } runTestCase := func(testCase string, data testCase) { @@ -413,7 +468,7 @@ func TestOnDealSectorCommitted(t *testing.T) { cbCallCount++ cbError = err } - err = OnDealSectorCommitted(ctx, api, eventsAPI, provider, startDealID, sectorNumber, proposal, &publishCid, cb) + err = OnDealSectorCommitted(ctx, api, eventsAPI, provider, sectorNumber, proposal, &publishCid, cb) if data.expectedError == nil { require.NoError(t, err) } else { diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 4ce32d2bfa0..26ac0f5a009 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -22,7 +22,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" @@ -48,56 +47,33 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events - publishSpec, addBalanceSpec *api.MessageSendSpec - dsMatcher *dealStateMatcher + dealPublisher *DealPublisher + + addBalanceSpec *api.MessageSendSpec + dsMatcher *dealStateMatcher } -func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { - return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { +func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { + return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { na := &ProviderNodeAdapter{ FullNode: full, - dag: dag, - secb: secb, - ev: events.NewEvents(context.TODO(), full), - dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), + dag: dag, + secb: secb, + ev: events.NewEvents(context.TODO(), full), + dealPublisher: dealPublisher, + dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), } if fc != nil { - na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)} na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)} } + return na } } func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { - log.Info("publishing deal") - - mi, err := n.StateMinerInfo(ctx, deal.Proposal.Provider, types.EmptyTSK) - if err != nil { - return cid.Undef, err - } - - params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ - Deals: []market2.ClientDealProposal{deal.ClientDealProposal}, - }) - - if err != nil { - return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) - } - - // TODO: We may want this to happen after fetching data - smsg, err := n.MpoolPushMessage(ctx, &types.Message{ - To: market.Address, - From: mi.Worker, - Value: types.NewInt(0), - Method: market.Methods.PublishStorageDeals, - Params: params, - }, n.publishSpec) - if err != nil { - return cid.Undef, err - } - return smsg.Cid(), nil + return n.dealPublisher.Publish(ctx, deal.ClientDealProposal) } func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) { @@ -271,11 +247,11 @@ func (n *ProviderNodeAdapter) DealProviderCollateralBounds(ctx context.Context, } func (n *ProviderNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { - return OnDealSectorPreCommitted(ctx, n, n.ev, provider, dealID, market.DealProposal(proposal), publishCid, cb) + return OnDealSectorPreCommitted(ctx, n, n.ev, provider, market.DealProposal(proposal), publishCid, cb) } func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { - return OnDealSectorCommitted(ctx, n, n.ev, provider, dealID, sectorNumber, market.DealProposal(proposal), publishCid, cb) + return OnDealSectorCommitted(ctx, n, n.ev, provider, sectorNumber, market.DealProposal(proposal), publishCid, cb) } func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) { diff --git a/node/builder.go b/node/builder.go index 8ee9b367440..440ab6df3ba 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,6 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -519,6 +520,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), @@ -646,5 +648,6 @@ func Test() Option { Unset(RunPeerMgrKey), Unset(new(*peermgr.PeerMgr)), Override(new(beacon.Schedule), testing.RandomBeacon), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), ) } diff --git a/node/config/def.go b/node/config/def.go index 68371c3842a..6301324014a 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -33,6 +33,7 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig + PublishMsg PublishMsgConfig Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig @@ -53,6 +54,15 @@ type DealmakingConfig struct { RetrievalFilter string } +type PublishMsgConfig struct { + // The amount of time to wait for more deals to arrive before + // publishing + PublishPeriod Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerMsg uint64 +} + type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 @@ -208,6 +218,11 @@ func DefaultStorageMiner() *StorageMiner { ExpectedSealDuration: Duration(time.Hour * 24), }, + PublishMsg: PublishMsgConfig{ + PublishPeriod: Duration(time.Hour), + MaxDealsPerMsg: 8, + }, + Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"), diff --git a/node/node_test.go b/node/node_test.go index 0baa047dafd..e23adb6ed1f 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -5,14 +5,12 @@ import ( "testing" "time" - builder "github.com/filecoin-project/lotus/node/test" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/lib/lotuslog" - logging "github.com/ipfs/go-log/v2" - "github.com/filecoin-project/lotus/api/test" "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/lib/lotuslog" + builder "github.com/filecoin-project/lotus/node/test" + logging "github.com/ipfs/go-log/v2" ) func init() { @@ -57,6 +55,9 @@ func TestAPIDealFlow(t *testing.T) { t.Run("TestFastRetrievalDealFlow", func(t *testing.T) { test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) + t.Run("TestPublishDealsBatching", func(t *testing.T) { + test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch) + }) } func TestAPIDealFlowReal(t *testing.T) { diff --git a/node/test/builder.go b/node/test/builder.go index f6599cf23cd..94faa474145 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -288,7 +288,11 @@ func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test. genMiner := maddrs[i] wa := genms[i].Worker - storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options()) + opts := def.Opts + if opts == nil { + opts = node.Options() + } + storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, opts) if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil { t.Fatalf("%+v", err) } @@ -455,12 +459,17 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes } } + opts := def.Opts + if opts == nil { + opts = node.Options() + } storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options( node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) { return mock.NewMockSectorMgr(sectors), nil }), node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Unset(new(*sectorstorage.Manager)), + opts, )) if rpc {