diff --git a/markets/dealfilter/cli.go b/markets/dealfilter/cli.go index 2cb9d6c4fc5..af832bfa08a 100644 --- a/markets/dealfilter/cli.go +++ b/markets/dealfilter/cli.go @@ -6,35 +6,57 @@ import ( "encoding/json" "os/exec" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/lotus/node/modules/dtypes" ) -func CliDealFilter(cmd string) dtypes.DealFilter { - // TODO: run some checks on the cmd string - +func CliStorageDealFilter(cmd string) dtypes.StorageDealFilter { return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { - j, err := json.MarshalIndent(deal, "", " ") - if err != nil { - return false, "", err + d := struct { + storagemarket.MinerDeal + DealType string + }{ + MinerDeal: deal, + DealType: "storage", } + return runDealFilter(ctx, cmd, d) + } +} - var out bytes.Buffer +func CliRetrievalDealFilter(cmd string) dtypes.RetrievalDealFilter { + return func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) { + d := struct { + retrievalmarket.ProviderDealState + DealType string + }{ + ProviderDealState: deal, + DealType: "retrieval", + } + return runDealFilter(ctx, cmd, d) + } +} + +func runDealFilter(ctx context.Context, cmd string, deal interface{}) (bool, string, error) { + j, err := json.MarshalIndent(deal, "", " ") + if err != nil { + return false, "", err + } - c := exec.Command("sh", "-c", cmd) - c.Stdin = bytes.NewReader(j) - c.Stdout = &out - c.Stderr = &out + var out bytes.Buffer - switch err := c.Run().(type) { - case nil: - return true, "", nil - case *exec.ExitError: - return false, out.String(), nil - default: - return false, "filter cmd run error", err - } + c := exec.Command("sh", "-c", cmd) + c.Stdin = bytes.NewReader(j) + c.Stdout = &out + c.Stderr = &out + switch err := c.Run().(type) { + case nil: + return true, "", nil + case *exec.ExitError: + return false, out.String(), nil + default: + return false, "filter cmd run error", err } } diff --git a/node/builder.go b/node/builder.go index f96505eec37..1dc12e80a79 100644 --- a/node/builder.go +++ b/node/builder.go @@ -354,7 +354,8 @@ func Online() Option { Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer), Override(new(dtypes.ProviderPieceStore), modules.NewProviderPieceStore), Override(new(*storedask.StoredAsk), modules.NewStorageAsk), - Override(new(dtypes.DealFilter), modules.BasicDealFilter(nil)), + Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), + Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(modules.ProviderDealFunds), modules.NewProviderDealFunds), Override(new(storagemarket.StorageProvider), modules.StorageProvider), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), @@ -484,7 +485,11 @@ func ConfigStorageMiner(c interface{}) Option { ConfigCommon(&cfg.Common), If(cfg.Dealmaking.Filter != "", - Override(new(dtypes.DealFilter), modules.BasicDealFilter(dealfilter.CliDealFilter(cfg.Dealmaking.Filter))), + Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))), + ), + + If(cfg.Dealmaking.RetrievalFilter != "", + Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), diff --git a/node/config/def.go b/node/config/def.go index 7eb7e19f0f2..a1b57286c7e 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -45,7 +45,8 @@ type DealmakingConfig struct { PieceCidBlocklist []cid.Cid ExpectedSealDuration Duration - Filter string + Filter string + RetrievalFilter string } type SealingConfig struct { diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index 5bb439b4d6d..1ef157b7ed2 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -7,6 +7,7 @@ import ( "github.com/ipfs/go-cid" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" @@ -71,4 +72,5 @@ type SetExpectedSealDurationFunc func(time.Duration) error // too determine how long sealing is expected to take type GetExpectedSealDurationFunc func() (time.Duration, error) -type DealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) +type StorageDealFilter func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) +type RetrievalDealFilter func(ctx context.Context, deal retrievalmarket.ProviderDealState) (bool, string, error) diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index adacf540588..74dfff5e15d 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -412,16 +412,16 @@ func NewProviderDealFunds(ds dtypes.MetadataDS) (ProviderDealFunds, error) { return funds.NewDealFunds(ds, datastore.NewKey("/marketfunds/provider")) } -func BasicDealFilter(user dtypes.DealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, +func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, - spn storagemarket.StorageProviderNode) dtypes.DealFilter { + spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter { return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc, offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc, blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc, expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc, - spn storagemarket.StorageProviderNode) dtypes.DealFilter { + spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter { return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) { b, err := onlineOk() @@ -497,7 +497,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress, pieceStore dtypes.ProviderPieceStore, dataTransfer dtypes.ProviderDataTransfer, spn storagemarket.StorageProviderNode, - df dtypes.DealFilter, + df dtypes.StorageDealFilter, funds ProviderDealFunds, ) (storagemarket.StorageProvider, error) { net := smnet.NewFromLibp2pHost(h) @@ -511,8 +511,52 @@ func StorageProvider(minerAddress dtypes.MinerAddress, return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, mds, pieceStore, dataTransfer, spn, address.Address(minerAddress), ffiConfig.SealProofType, storedAsk, funds, opt) } +func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter { + return func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) dtypes.RetrievalDealFilter { + return func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) { + b, err := onlineOk() + if err != nil { + return false, "miner error", err + } + + if !b { + log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client") + return false, "miner is not accepting online retrieval deals", nil + } + + b, err = offlineOk() + if err != nil { + return false, "miner error", err + } + + if !b { + log.Info("offline retrieval has not been implemented yet") + } + + if userFilter != nil { + return userFilter(ctx, state) + } + + return true, "", nil + } + } +} + // RetrievalProvider creates a new retrieval provider attached to the provider blockstore -func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.SectorManager, full lapi.FullNode, ds dtypes.MetadataDS, pieceStore dtypes.ProviderPieceStore, mds dtypes.StagingMultiDstore, dt dtypes.ProviderDataTransfer, onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc) (retrievalmarket.RetrievalProvider, error) { +func RetrievalProvider(h host.Host, + miner *storage.Miner, + sealer sectorstorage.SectorManager, + full lapi.FullNode, + ds dtypes.MetadataDS, + pieceStore dtypes.ProviderPieceStore, + mds dtypes.StagingMultiDstore, + dt dtypes.ProviderDataTransfer, + onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc, + offlineOk dtypes.ConsiderOfflineRetrievalDealsConfigFunc, + userFilter dtypes.RetrievalDealFilter, +) (retrievalmarket.RetrievalProvider, error) { adapter := retrievaladapter.NewRetrievalProviderNode(miner, sealer, full) maddr, err := minerAddrFromDS(ds) @@ -521,29 +565,7 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S } netwk := rmnet.NewFromLibp2pHost(h) - - opt := retrievalimpl.DealDeciderOpt(func(ctx context.Context, state retrievalmarket.ProviderDealState) (bool, string, error) { - b, err := onlineOk() - if err != nil { - return false, "miner error", err - } - - if !b { - log.Warn("online retrieval deal consideration disabled; rejecting retrieval deal proposal from client") - return false, "miner is not accepting online retrieval deals", nil - } - - b, err = offlineOk() - if err != nil { - return false, "miner error", err - } - - if !b { - log.Info("offline retrieval has not been implemented yet") - } - - return true, "", nil - }) + opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter)) return retrievalimpl.NewProvider(maddr, adapter, netwk, pieceStore, mds, dt, namespace.Wrap(ds, datastore.NewKey("/retrievals/provider")), opt) }