From 8bd6d0019d143855afd6a0c7339dc75516bf2513 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 6 May 2021 14:57:57 +0530 Subject: [PATCH] changes for dynamic pricing --- .../impl/askstore/askstore_impl.go | 129 ++++++++++++++ .../impl/askstore/askstore_impl_test.go | 75 ++++++++ retrievalmarket/impl/integration_test.go | 6 +- retrievalmarket/impl/provider.go | 66 +++++-- retrievalmarket/impl/provider_environments.go | 11 +- retrievalmarket/impl/provider_test.go | 162 ++++++++++++------ .../testnodes/test_retrieval_provider_node.go | 25 ++- retrievalmarket/nodes.go | 2 +- retrievalmarket/provider.go | 12 ++ .../storage_retrieval_integration_test.go | 2 +- retrievalmarket/types.go | 19 +- 11 files changed, 428 insertions(+), 81 deletions(-) create mode 100644 retrievalmarket/impl/askstore/askstore_impl.go create mode 100644 retrievalmarket/impl/askstore/askstore_impl_test.go diff --git a/retrievalmarket/impl/askstore/askstore_impl.go b/retrievalmarket/impl/askstore/askstore_impl.go new file mode 100644 index 00000000..857e06b5 --- /dev/null +++ b/retrievalmarket/impl/askstore/askstore_impl.go @@ -0,0 +1,129 @@ +package askstore + +import ( + "bytes" + "context" + "sync" + + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" + + cborutil "github.com/filecoin-project/go-cbor-util" + versioning "github.com/filecoin-project/go-ds-versioning/pkg" + versionedds "github.com/filecoin-project/go-ds-versioning/pkg/datastore" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" +) + +// AskStoreImpl implements AskStore, persisting a retrieval Ask +// to disk. It also maintains a cache of the current Ask in memory +type AskStoreImpl struct { + lk sync.RWMutex + ask *retrievalmarket.Ask + ds datastore.Batching + key datastore.Key +} + +// NewAskStore returns a new instance of AskStoreImpl +// It will initialize a new default ask and store it if one is not set. +// Otherwise it loads the current Ask from disk +func NewAskStore(ds datastore.Batching, key datastore.Key) (*AskStoreImpl, error) { + askMigrations, err := migrations.AskMigrations.Build() + if err != nil { + return nil, err + } + versionedDs, migrateDs := versionedds.NewVersionedDatastore(ds, askMigrations, versioning.VersionKey("1")) + err = migrateDs(context.TODO()) + if err != nil { + return nil, err + } + s := &AskStoreImpl{ + ds: versionedDs, + key: key, + } + + if err := s.tryLoadAsk(); err != nil { + return nil, err + } + + if s.ask == nil { + // for now set a default retrieval ask + defaultAsk := &retrievalmarket.Ask{ + PricePerByte: retrievalmarket.DefaultPricePerByte, + UnsealPrice: retrievalmarket.DefaultUnsealPrice, + PaymentInterval: retrievalmarket.DefaultPaymentInterval, + PaymentIntervalIncrease: retrievalmarket.DefaultPaymentIntervalIncrease, + } + + if err := s.SetAsk(defaultAsk); err != nil { + return nil, xerrors.Errorf("failed setting a default retrieval ask: %w", err) + } + } + return s, nil +} + +// SetAsk stores retrieval provider's ask +func (s *AskStoreImpl) SetAsk(ask *retrievalmarket.Ask) error { + s.lk.Lock() + defer s.lk.Unlock() + + return s.saveAsk(ask) +} + +// GetAsk returns the current retrieval ask, or nil if one does not exist. +func (s *AskStoreImpl) GetAsk() *retrievalmarket.Ask { + s.lk.RLock() + defer s.lk.RUnlock() + if s.ask == nil { + return nil + } + ask := *s.ask + return &ask +} + +func (s *AskStoreImpl) tryLoadAsk() error { + s.lk.Lock() + defer s.lk.Unlock() + + err := s.loadAsk() + + if err != nil { + if xerrors.Is(err, datastore.ErrNotFound) { + // this is expected + return nil + } + return err + } + + return nil +} + +func (s *AskStoreImpl) loadAsk() error { + askb, err := s.ds.Get(s.key) + if err != nil { + return xerrors.Errorf("failed to load most recent retrieval ask from disk: %w", err) + } + + var ask retrievalmarket.Ask + if err := cborutil.ReadCborRPC(bytes.NewReader(askb), &ask); err != nil { + return err + } + + s.ask = &ask + return nil +} + +func (s *AskStoreImpl) saveAsk(a *retrievalmarket.Ask) error { + b, err := cborutil.Dump(a) + if err != nil { + return err + } + + if err := s.ds.Put(s.key, b); err != nil { + return err + } + + s.ask = a + return nil +} diff --git a/retrievalmarket/impl/askstore/askstore_impl_test.go b/retrievalmarket/impl/askstore/askstore_impl_test.go new file mode 100644 index 00000000..296ab2c8 --- /dev/null +++ b/retrievalmarket/impl/askstore/askstore_impl_test.go @@ -0,0 +1,75 @@ +package askstore_test + +import ( + "bytes" + "math/rand" + "testing" + + "github.com/ipfs/go-datastore" + dss "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" +) + +func TestAskStoreImpl(t *testing.T) { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + store, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask")) + require.NoError(t, err) + + // A new store returns the default ask + ask := store.GetAsk() + require.NotNil(t, ask) + + require.Equal(t, retrievalmarket.DefaultUnsealPrice, ask.UnsealPrice) + require.Equal(t, retrievalmarket.DefaultPricePerByte, ask.PricePerByte) + require.Equal(t, retrievalmarket.DefaultPaymentInterval, ask.PaymentInterval) + require.Equal(t, retrievalmarket.DefaultPaymentIntervalIncrease, ask.PaymentIntervalIncrease) + + // Store a new ask + newAsk := &retrievalmarket.Ask{ + PricePerByte: abi.NewTokenAmount(123), + UnsealPrice: abi.NewTokenAmount(456), + PaymentInterval: 789, + PaymentIntervalIncrease: 789, + } + err = store.SetAsk(newAsk) + require.NoError(t, err) + + // Fetch new ask + stored := store.GetAsk() + require.Equal(t, newAsk, stored) + + // Construct a new AskStore and make sure it returns the previously-stored ask + newStore, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask")) + require.NoError(t, err) + stored = newStore.GetAsk() + require.Equal(t, newAsk, stored) +} +func TestMigrations(t *testing.T) { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + oldAsk := &migrations.Ask0{ + PricePerByte: abi.NewTokenAmount(rand.Int63()), + UnsealPrice: abi.NewTokenAmount(rand.Int63()), + PaymentInterval: rand.Uint64(), + PaymentIntervalIncrease: rand.Uint64(), + } + buf := new(bytes.Buffer) + err := oldAsk.MarshalCBOR(buf) + require.NoError(t, err) + ds.Put(datastore.NewKey("retrieval-ask"), buf.Bytes()) + newStore, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask")) + require.NoError(t, err) + ask := newStore.GetAsk() + expectedAsk := &retrievalmarket.Ask{ + PricePerByte: oldAsk.PricePerByte, + UnsealPrice: oldAsk.UnsealPrice, + PaymentInterval: oldAsk.PaymentInterval, + PaymentIntervalIncrease: oldAsk.PaymentIntervalIncrease, + } + require.Equal(t, expectedAsk, ask) +} diff --git a/retrievalmarket/impl/integration_test.go b/retrievalmarket/impl/integration_test.go index 972cba32..2aa2e770 100644 --- a/retrievalmarket/impl/integration_test.go +++ b/retrievalmarket/impl/integration_test.go @@ -157,7 +157,7 @@ func requireSetupTestClientAndProvider(ctx context.Context, t *testing.T, payChA require.NoError(t, err) providerDs := namespace.Wrap(testData.Ds2, datastore.NewKey("/retrievals/provider")) - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} ask.PaymentInterval = expectedQR.MaxPaymentInterval ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease @@ -388,12 +388,14 @@ func TestClientCanMakeDealWithProvider(t *testing.T) { PieceCID: tut.GenerateCids(1)[0], Deals: []piecestore.DealInfo{ { + DealID: abi.DealID(100), SectorID: sectorID, Offset: offset, Length: abi.UnpaddedPieceSize(len(carData)).Padded(), }, }, } + providerNode.ExpectPricingParams(pieceInfo.PieceCID, []abi.DealID{100}) if testCase.failsUnseal { providerNode.ExpectFailedUnseal(sectorID, offset.Unpadded(), abi.UnpaddedPieceSize(len(carData))) } else { @@ -669,7 +671,7 @@ func setupProvider( opts = append(opts, retrievalimpl.DisableNewDeals()) } - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} ask.PaymentInterval = expectedQR.MaxPaymentInterval ask.PaymentIntervalIncrease = expectedQR.MaxPaymentIntervalIncrease diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index 8468e36c..4ea7b9c1 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -7,26 +7,26 @@ import ( "github.com/hannahhoward/go-pubsub" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" - "github.com/libp2p/go-libp2p-core/peer" + "github.com/ipfs/go-datastore/namespace" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" versioning "github.com/filecoin-project/go-ds-versioning/pkg" versionedfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm" - "github.com/filecoin-project/go-multistore" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/go-statemachine/fsm" - "github.com/filecoin-project/go-fil-markets/piecestore" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates" "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation" "github.com/filecoin-project/go-fil-markets/retrievalmarket/migrations" rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network" "github.com/filecoin-project/go-fil-markets/shared" + "github.com/filecoin-project/go-multistore" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-statemachine/fsm" ) // RetrievalProviderOption is a function that configures a retrieval provider @@ -35,7 +35,7 @@ type RetrievalProviderOption func(p *Provider) // DealDecider is a function that makes a decision about whether to accept a deal type DealDecider func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) -type DealPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) +type DealPricingFunc func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) // Provider is the production implementation of the RetrievalProvider interface type Provider struct { @@ -52,6 +52,7 @@ type Provider struct { stateMachines fsm.Group migrateStateMachines func(context.Context) error dealDecider DealDecider + askStore retrievalmarket.AskStore disableNewDeals bool dealPricingFunc DealPricingFunc } @@ -123,6 +124,12 @@ func NewProvider(minerAddress address.Address, return nil, err } + askStore, err := askstore.NewAskStore(namespace.Wrap(ds, datastore.NewKey("retrieval-ask")), datastore.NewKey("latest")) + if err != nil { + return nil, err + } + p.askStore = askStore + retrievalMigrations, err := migrations.ProviderMigrations.Build() if err != nil { return nil, err @@ -231,6 +238,21 @@ func (p *Provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscrib return retrievalmarket.Unsubscribe(p.subscribers.Subscribe(subscriber)) } +// GetAsk returns the current deal parameters this provider accepts +func (p *Provider) GetAsk() *retrievalmarket.Ask { + return p.askStore.GetAsk() +} + +// SetAsk sets the deal parameters this provider accepts +func (p *Provider) SetAsk(ask *retrievalmarket.Ask) { + + err := p.askStore.SetAsk(ask) + + if err != nil { + log.Warnf("Error setting retrieval ask: %w", err) + } +} + // ListDeals lists all known retrieval deals func (p *Provider) ListDeals() map[retrievalmarket.ProviderDealIdentifier]retrievalmarket.ProviderDealState { var deals []retrievalmarket.ProviderDealState @@ -314,7 +336,16 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { } } - ask, err := p.GetAsk(ctx, storageDeals, isUnsealed, stream.RemotePeer()) + input := retrievalmarket.PricingInput{ + // piece from which the payload will be retrieved + // If user hasn't given a PieceCID, we try to choose an unsealed piece in the call to `getPieceInfoFromCid` above. + PieceCID: pieceInfo.PieceCID, + + PayloadCID: query.PayloadCID, + Unsealed: isUnsealed, + Client: stream.RemotePeer(), + } + ask, err := p.GetDynamicAsk(ctx, input, storageDeals) if err != nil { log.Errorf("Retrieval query: GetAsk: %s", err) return @@ -339,14 +370,23 @@ func (p *Provider) HandleQueryStream(stream rmnet.RetrievalQueryStream) { } } -func (p *Provider) GetAsk(ctx context.Context, storageDeals []abi.DealID, isUnsealed bool, client peer.ID) (retrievalmarket.Ask, error) { - // TODO How do we fetch the fast-retrieval flag here ? - dp, err := p.node.GetDealPricingParams(ctx, storageDeals) +func (p *Provider) GetDynamicAsk(ctx context.Context, input retrievalmarket.PricingInput, storageDeals []abi.DealID) (retrievalmarket.Ask, error) { + dp, err := p.node.GetRetrievalPricingInput(ctx, input.PieceCID, storageDeals) if err != nil { return retrievalmarket.Ask{}, xerrors.Errorf("GetDealPricingParams: %s", err) } - dp.Unsealed = isUnsealed - dp.Client = client + // currAsk cannot be nil as we initialize the ask store with a default ask. + // Users can then change the values in the ask store using SetAsk but not remove it. + currAsk := p.GetAsk() + if currAsk == nil { + return retrievalmarket.Ask{}, xerrors.New("no ask configured in ask-store") + } + + dp.PayloadCID = input.PayloadCID + dp.PieceCID = input.PieceCID + dp.Unsealed = input.Unsealed + dp.Client = input.Client + dp.CurrentAsk = *currAsk ask, err := p.dealPricingFunc(ctx, dp) if err != nil { diff --git a/retrievalmarket/impl/provider_environments.go b/retrievalmarket/impl/provider_environments.go index cf4bf6d5..f714606d 100644 --- a/retrievalmarket/impl/provider_environments.go +++ b/retrievalmarket/impl/provider_environments.go @@ -50,7 +50,16 @@ func (pve *providerValidationEnvironment) GetAsk(ctx context.Context, payloadCid } } - return pve.p.GetAsk(ctx, storageDeals, isUnsealed, client) + input := retrievalmarket.PricingInput{ + // piece from which the payload will be retrieved + PieceCID: piece.PieceCID, + + PayloadCID: payloadCid, + Unsealed: isUnsealed, + Client: client, + } + + return pve.p.GetDynamicAsk(ctx, input, storageDeals) } func (pve *providerValidationEnvironment) GetPiece(c cid.Cid, pieceCID *cid.Cid) (piecestore.PieceInfo, bool, error) { diff --git a/retrievalmarket/impl/provider_test.go b/retrievalmarket/impl/provider_test.go index 3db97267..fdd568ac 100644 --- a/retrievalmarket/impl/provider_test.go +++ b/retrievalmarket/impl/provider_test.go @@ -110,45 +110,43 @@ func TestDynamicPricing(t *testing.T) { }, } - receiveStreamOnProvider := func(t *testing.T, node *testnodes.TestRetrievalProviderNode, qs network.RetrievalQueryStream, pieceStore piecestore.PieceStore) { - ds := dss.MutexWrap(datastore.NewMapDatastore()) - multiStore, err := multistore.NewMultiDstore(ds) - require.NoError(t, err) - dt := tut.NewTestDataTransfer() - net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) - - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { - ask := retrievalmarket.Ask{} - - if dealPricingParams.VerifiedDeal { - ask.PricePerByte = expectedppbVerified - } else { - ask.PricePerByte = expectedppbUnVerified - } + dPriceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask := retrievalmarket.Ask{} - if dealPricingParams.Unsealed { - ask.UnsealPrice = expectedUnsealDiscount - } else { - ask.UnsealPrice = expectedUnsealPrice - } + if dealPricingParams.VerifiedDeal { + ask.PricePerByte = expectedppbVerified + } else { + ask.PricePerByte = expectedppbUnVerified + } - fmt.Println("\n client is", dealPricingParams.Client.String()) - if dealPricingParams.Client == peer2 { - ask.PaymentInterval = expectedpiPeer2 - } else { - ask.PaymentInterval = expectedpiPeer1 - } - ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease + if dealPricingParams.Unsealed { + ask.UnsealPrice = expectedUnsealDiscount + } else { + ask.UnsealPrice = expectedUnsealPrice + } - return ask, nil + fmt.Println("\n client is", dealPricingParams.Client.String()) + if dealPricingParams.Client == peer2 { + ask.PaymentInterval = expectedpiPeer2 + } else { + ask.PaymentInterval = expectedpiPeer1 } + ask.PaymentIntervalIncrease = expectedPaymentIntervalIncrease - c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds, priceFunc) + return ask, nil + } + + buildProvider := func(t *testing.T, node *testnodes.TestRetrievalProviderNode, qs network.RetrievalQueryStream, + pieceStore piecestore.PieceStore, net *tut.TestRetrievalMarketNetwork, pFnc retrievalimpl.DealPricingFunc) retrievalmarket.RetrievalProvider { + ds := dss.MutexWrap(datastore.NewMapDatastore()) + multiStore, err := multistore.NewMultiDstore(ds) require.NoError(t, err) + dt := tut.NewTestDataTransfer() + c, err := retrievalimpl.NewProvider(expectedAddress, node, net, pieceStore, multiStore, dt, ds, pFnc) + require.NoError(t, err) tut.StartAndWaitForReady(ctx, t, c) - - net.ReceiveQueryStream(qs) + return c } readWriteQueryStream := func() *tut.TestRetrievalQueryStream { @@ -164,10 +162,13 @@ func TestDynamicPricing(t *testing.T) { } tcs := map[string]struct { - query retrievalmarket.Query - expFunc func(t *testing.T, pieceStore *tut.TestPieceStore) - nodeFunc func(n *testnodes.TestRetrievalProviderNode) - peerIdFnc func(stream *tut.TestRetrievalQueryStream) + query retrievalmarket.Query + expFunc func(t *testing.T, pieceStore *tut.TestPieceStore) + nodeFunc func(n *testnodes.TestRetrievalProviderNode) + peerIdFnc func(stream *tut.TestRetrievalQueryStream) + providerFnc func(provider retrievalmarket.RetrievalProvider) + + pricingFnc retrievalimpl.DealPricingFunc expectedPricePerByte abi.TokenAmount expectedPaymentInterval uint64 @@ -182,13 +183,15 @@ func TestDynamicPricing(t *testing.T) { qs.SetRemotePeer(peer1) }, nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { - n.ExpectPricingParamDeals([]abi.DealID{1, 11, 2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbUnVerified, expectedPaymentInterval: expectedpiPeer1, @@ -203,13 +206,15 @@ func TestDynamicPricing(t *testing.T) { qs.SetRemotePeer(peer2) }, nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { - n.ExpectPricingParamDeals([]abi.DealID{1, 11, 2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbUnVerified, expectedPaymentInterval: expectedpiPeer2, @@ -225,13 +230,15 @@ func TestDynamicPricing(t *testing.T) { }, nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { n.MarkVerified() - n.ExpectPricingParamDeals([]abi.DealID{1, 11, 2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbVerified, expectedPaymentInterval: expectedpiPeer1, @@ -248,13 +255,15 @@ func TestDynamicPricing(t *testing.T) { nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { p := piece2.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) - n.ExpectPricingParamDeals([]abi.DealID{1, 11, 2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbUnVerified, expectedPaymentInterval: expectedpiPeer1, @@ -272,13 +281,15 @@ func TestDynamicPricing(t *testing.T) { p := piece2.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) n.MarkVerified() - n.ExpectPricingParamDeals([]abi.DealID{1, 11, 2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{1, 11, 2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbVerified, expectedPaymentInterval: expectedpiPeer1, @@ -287,7 +298,7 @@ func TestDynamicPricing(t *testing.T) { expectedSize: piece2Size, }, - // Retrieval request for a payloadCid inside a specific Cid + // Retrieval requests for a payloadCid inside a specific piece Cid "specific sealed piece Cid, first piece Cid matches: quote correct price for sealed, unverified, peer1": { query: retrievalmarket.Query{ PayloadCID: payloadCID, @@ -299,12 +310,14 @@ func TestDynamicPricing(t *testing.T) { nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { p := piece2.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) - n.ExpectPricingParamDeals([]abi.DealID{1, 11}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbUnVerified, expectedPaymentInterval: expectedpiPeer1, @@ -324,13 +337,15 @@ func TestDynamicPricing(t *testing.T) { nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { p := piece1.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) - n.ExpectPricingParamDeals([]abi.DealID{2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID1, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbUnVerified, expectedPaymentInterval: expectedpiPeer1, @@ -350,13 +365,15 @@ func TestDynamicPricing(t *testing.T) { nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { p := piece2.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) - n.ExpectPricingParamDeals([]abi.DealID{1, 11}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) n.MarkVerified() }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbVerified, expectedPaymentInterval: expectedpiPeer1, @@ -377,12 +394,14 @@ func TestDynamicPricing(t *testing.T) { p := piece1.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) n.MarkVerified() - n.ExpectPricingParamDeals([]abi.DealID{1, 11}) + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbVerified, expectedPaymentInterval: expectedpiPeer1, @@ -403,13 +422,15 @@ func TestDynamicPricing(t *testing.T) { p := piece2.Deals[0] n.MarkUnsealed(context.TODO(), p.SectorID, p.Offset.Unpadded(), p.Length.Unpadded()) n.MarkVerified() - n.ExpectPricingParamDeals([]abi.DealID{2, 22, 222}) + n.ExpectPricingParams(expectedPieceCID2, []abi.DealID{2, 22, 222}) }, expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { pieceStore.ExpectCID(payloadCID, expectedCIDInfo) pieceStore.ExpectPiece(expectedPieceCID1, piece1) pieceStore.ExpectPiece(expectedPieceCID2, piece2) }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) {}, + pricingFnc: dPriceFunc, expectedPricePerByte: expectedppbVerified, expectedPaymentInterval: expectedpiPeer2, @@ -417,6 +438,40 @@ func TestDynamicPricing(t *testing.T) { expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, expectedSize: piece2Size, }, + "pieceCid no-op: quote correct price for sealed, unverified, peer1 based on a pre-existing ask": { + query: retrievalmarket.Query{PayloadCID: payloadCID}, + peerIdFnc: func(qs *tut.TestRetrievalQueryStream) { + qs.SetRemotePeer(peer1) + }, + nodeFunc: func(n *testnodes.TestRetrievalProviderNode) { + n.ExpectPricingParams(expectedPieceCID1, []abi.DealID{1, 11, 2, 22, 222}) + }, + expFunc: func(t *testing.T, pieceStore *tut.TestPieceStore) { + pieceStore.ExpectCID(payloadCID, expectedCIDInfo) + pieceStore.ExpectPiece(expectedPieceCID1, piece1) + pieceStore.ExpectPiece(expectedPieceCID2, piece2) + }, + providerFnc: func(provider retrievalmarket.RetrievalProvider) { + ask := provider.GetAsk() + ask.PricePerByte = expectedppbUnVerified + ask.UnsealPrice = expectedUnsealPrice + provider.SetAsk(ask) + }, + pricingFnc: func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { + ask, _ := dPriceFunc(ctx, dealPricingParams) + ppb := big.Add(ask.PricePerByte, dealPricingParams.CurrentAsk.PricePerByte) + unseal := big.Add(ask.UnsealPrice, dealPricingParams.CurrentAsk.UnsealPrice) + ask.PricePerByte = ppb + ask.UnsealPrice = unseal + return ask, nil + }, + + expectedPricePerByte: big.Mul(expectedppbUnVerified, big.NewInt(2)), + expectedPaymentInterval: expectedpiPeer1, + expectedUnsealPrice: big.Mul(expectedUnsealPrice, big.NewInt(2)), + expectedPaymentIntervalIncrease: expectedPaymentIntervalIncrease, + expectedSize: piece1Size, + }, } for name, tc := range tcs { @@ -430,9 +485,14 @@ func TestDynamicPricing(t *testing.T) { pieceStore := tut.NewTestPieceStore() tc.nodeFunc(node) tc.expFunc(t, pieceStore) - receiveStreamOnProvider(t, node, qs, pieceStore) + + net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) + p := buildProvider(t, node, qs, pieceStore, net, tc.pricingFnc) + tc.providerFnc(p) + net.ReceiveQueryStream(qs) actualResp, err := qs.ReadQueryResponse() + require.NoError(t, err) pieceStore.VerifyExpectations(t) node.VerifyExpectations(t) @@ -514,7 +574,7 @@ func TestHandleQueryStream(t *testing.T) { dt := tut.NewTestDataTransfer() net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{}) - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} ask.PricePerByte = expectedPricePerByte ask.PaymentInterval = expectedPaymentInterval @@ -739,7 +799,7 @@ func TestProvider_Construct(t *testing.T) { require.NoError(t, err) dt := tut.NewTestDataTransfer() - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} return ask, nil } @@ -791,7 +851,7 @@ func TestProviderConfigOpts(t *testing.T) { multiStore, err := multistore.NewMultiDstore(ds) require.NoError(t, err) - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} return ask, nil } @@ -971,7 +1031,7 @@ func TestProviderMigrations(t *testing.T) { err = providerDs.Put(datastore.NewKey("retrieval-ask"), askBuf.Bytes()) require.NoError(t, err) - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} return ask, nil } diff --git a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go index ff21c00d..2b41d0fe 100644 --- a/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go +++ b/retrievalmarket/impl/testnodes/test_retrieval_provider_node.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "testing" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" @@ -48,9 +49,13 @@ type TestRetrievalProviderNode struct { receivedVouchers map[expectedVoucherKey]struct{} expectedPricingParamDeals []abi.DealID - recievedPricingParamDeals []abi.DealID - unsealed map[sectorKey]struct{} - isVerified bool + receivedPricingParamDeals []abi.DealID + + expectedPricingPieceCID cid.Cid + receivedPricingPieceCID cid.Cid + + unsealed map[sectorKey]struct{} + isVerified bool } var _ retrievalmarket.RetrievalProviderNode = &TestRetrievalProviderNode{} @@ -81,13 +86,16 @@ func (trpn *TestRetrievalProviderNode) MarkVerified() { trpn.isVerified = true } -func (trpn *TestRetrievalProviderNode) ExpectPricingParamDeals(deals []abi.DealID) { +func (trpn *TestRetrievalProviderNode) ExpectPricingParams(pieceCID cid.Cid, deals []abi.DealID) { + trpn.expectedPricingPieceCID = pieceCID trpn.expectedPricingParamDeals = deals } -func (trpn *TestRetrievalProviderNode) GetDealPricingParams(_ context.Context, deals []abi.DealID) (retrievalmarket.DealPricingParams, error) { - trpn.recievedPricingParamDeals = deals - return retrievalmarket.DealPricingParams{ +func (trpn *TestRetrievalProviderNode) GetRetrievalPricingInput(_ context.Context, pieceCID cid.Cid, deals []abi.DealID) (retrievalmarket.PricingInput, error) { + trpn.receivedPricingParamDeals = deals + trpn.receivedPricingPieceCID = pieceCID + + return retrievalmarket.PricingInput{ VerifiedDeal: trpn.isVerified, }, nil } @@ -126,8 +134,9 @@ func (trpn *TestRetrievalProviderNode) UnsealSector(ctx context.Context, sectorI func (trpn *TestRetrievalProviderNode) VerifyExpectations(t *testing.T) { require.Equal(t, len(trpn.expectedVouchers), len(trpn.receivedVouchers)) require.Equal(t, trpn.expectations, trpn.received) + require.Equal(t, trpn.expectedPricingPieceCID, trpn.receivedPricingPieceCID) - require.Equal(t, trpn.expectedPricingParamDeals, trpn.recievedPricingParamDeals) + require.Equal(t, trpn.expectedPricingParamDeals, trpn.receivedPricingParamDeals) } // SavePaymentVoucher simulates saving a payment voucher with a stubbed result diff --git a/retrievalmarket/nodes.go b/retrievalmarket/nodes.go index 358b3c1b..8eb27e0d 100644 --- a/retrievalmarket/nodes.go +++ b/retrievalmarket/nodes.go @@ -55,5 +55,5 @@ type RetrievalProviderNode interface { IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) - GetDealPricingParams(ctx context.Context, storageDeals []abi.DealID) (DealPricingParams, error) + GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (PricingInput, error) } diff --git a/retrievalmarket/provider.go b/retrievalmarket/provider.go index e40a6d19..f2926aa0 100644 --- a/retrievalmarket/provider.go +++ b/retrievalmarket/provider.go @@ -21,8 +21,20 @@ type RetrievalProvider interface { // Stop stops handling incoming requests Stop() error + // SetAsk sets the retrieval payment parameters that this miner will accept + SetAsk(ask *Ask) + + // GetAsk returns the retrieval providers pricing information + GetAsk() *Ask + // SubscribeToEvents listens for events that happen related to client retrievals SubscribeToEvents(subscriber ProviderSubscriber) Unsubscribe ListDeals() map[ProviderDealIdentifier]ProviderDealState } + +// AskStore is an interface which provides access to a persisted retrieval Ask +type AskStore interface { + GetAsk() *Ask + SetAsk(ask *Ask) error +} diff --git a/retrievalmarket/storage_retrieval_integration_test.go b/retrievalmarket/storage_retrieval_integration_test.go index a1aecc09..b83602a1 100644 --- a/retrievalmarket/storage_retrieval_integration_test.go +++ b/retrievalmarket/storage_retrieval_integration_test.go @@ -500,7 +500,7 @@ func newRetrievalHarness(ctx context.Context, t *testing.T, sh *testharness.Stor p = params[0] } - priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.DealPricingParams) (retrievalmarket.Ask, error) { + priceFunc := func(ctx context.Context, dealPricingParams retrievalmarket.PricingInput) (retrievalmarket.Ask, error) { ask := retrievalmarket.Ask{} ask.PaymentInterval = p.PaymentInterval ask.PaymentIntervalIncrease = p.PaymentIntervalIncrease diff --git a/retrievalmarket/types.go b/retrievalmarket/types.go index e7b0d544..34680dd3 100644 --- a/retrievalmarket/types.go +++ b/retrievalmarket/types.go @@ -383,9 +383,20 @@ type ChannelAvailableFunds struct { VoucherReedeemedAmt abi.TokenAmount } -// DealPricingParams provides parameters required to price a retrieval deal -type DealPricingParams struct { - Client peer.ID +// PricingInput provides input parameters required to price a retrieval deal. +type PricingInput struct { + // PayloadCID is the cid of the payload to retrieve. + PayloadCID cid.Cid + // PieceCID is the cid of the Piece from which the Payload will be retrieved. + PieceCID cid.Cid + // PieceSize is the size of the Piece from which the payload will be retrieved. + PieceSize abi.UnpaddedPieceSize + // Client is the peerID of the retrieval client. + Client peer.ID + // VerifiedDeal is true if there exists a verified storage deal for the PayloadCID. VerifiedDeal bool - Unsealed bool + // Unsealed is true if there exists an unsealed sector from which we can retrieve the given payload. + Unsealed bool + // CurrentAsk is the current configured ask in the ask-store. + CurrentAsk Ask }