From 42b481fb61a51f30628b4aaff5a802d0db986018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 11:54:32 +0100 Subject: [PATCH 1/3] storageadapter: Look at precommits on-chain since deal publish msg --- markets/loggers/loggers.go | 8 +- markets/storageadapter/getcurrentdealinfo.go | 28 ++++--- .../storageadapter/getcurrentdealinfo_test.go | 14 +++- .../storageadapter/ondealsectorcommitted.go | 75 ++++++++++++++++--- 4 files changed, 96 insertions(+), 29 deletions(-) diff --git a/markets/loggers/loggers.go b/markets/loggers/loggers.go index 87c8dfe65a7..e5f669f2f5c 100644 --- a/markets/loggers/loggers.go +++ b/markets/loggers/loggers.go @@ -12,22 +12,22 @@ var log = logging.Logger("markets") // StorageClientLogger logs events from the storage client func StorageClientLogger(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { - log.Infow("storage event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage client event", "name", storagemarket.ClientEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // StorageProviderLogger logs events from the storage provider func StorageProviderLogger(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { - log.Infow("storage event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) + log.Infow("storage provider event", "name", storagemarket.ProviderEvents[event], "proposal CID", deal.ProposalCid, "state", storagemarket.DealStates[deal.State], "message", deal.Message) } // RetrievalClientLogger logs events from the retrieval client func RetrievalClientLogger(event retrievalmarket.ClientEvent, deal retrievalmarket.ClientDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval client event", "name", retrievalmarket.ClientEvents[event], "deal ID", deal.ID, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // RetrievalProviderLogger logs events from the retrieval provider func RetrievalProviderLogger(event retrievalmarket.ProviderEvent, deal retrievalmarket.ProviderDealState) { - log.Infow("retrieval event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) + log.Infow("retrieval provider event", "name", retrievalmarket.ProviderEvents[event], "deal ID", deal.ID, "receiver", deal.Receiver, "state", retrievalmarket.DealStatuses[deal.Status], "message", deal.Message) } // DataTransferLogger logs events from the data transfer module diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index ab8c3f52fc6..5a80472079d 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -18,47 +18,51 @@ type getCurrentDealInfoAPI interface { StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) + + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) + ChainHasObj(context.Context, cid.Cid) (bool, error) } // GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed -func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, error) { +func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) { marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) if dealErr == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if equal { - return dealID, marketDeal, nil + return dealID, marketDeal, types.EmptyTSK, nil } dealErr = xerrors.Errorf("Deal proposals did not match") } if publishCid == nil { - return dealID, nil, dealErr + return dealID, nil, types.EmptyTSK, dealErr } // attempt deal id correction lookup, err := api.StateSearchMsg(ctx, *publishCid) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if lookup.Receipt.ExitCode != exitcode.Ok { - return dealID, nil, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) } var retval market.PublishStorageDealsReturn if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return dealID, nil, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) + return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) } if len(retval.IDs) != 1 { // market currently only ever sends messages with 1 deal - return dealID, nil, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") } if retval.IDs[0] == dealID { // DealID did not change, so we are stuck with the original lookup error - return dealID, nil, dealErr + return dealID, nil, lookup.TipSet, dealErr } dealID = retval.IDs[0] @@ -67,13 +71,13 @@ func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDea if err == nil { equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) if err != nil { - return dealID, nil, err + return dealID, nil, types.EmptyTSK, err } if !equal { - return dealID, nil, xerrors.Errorf("Deal proposals did not match") + return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match") } } - return dealID, marketDeal, err + return dealID, marketDeal, lookup.TipSet, err } func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) { diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go index ed5d36c5b8c..46aaff2c368 100644 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ b/markets/storageadapter/getcurrentdealinfo_test.go @@ -209,7 +209,7 @@ func TestGetCurrentDealInfo(t *testing.T) { MarketDeals: marketDeals, } - dealID, marketDeal, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) + dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) require.Equal(t, data.expectedDealID, dealID) require.Equal(t, data.expectedMarketDeal, marketDeal) if data.expectedError == nil { @@ -236,6 +236,18 @@ type mockGetCurrentDealInfoAPI struct { MarketDeals map[marketDealKey]*api.MarketDeal } +func (mapi *mockGetCurrentDealInfoAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { + panic("implement me") +} + +func (mapi *mockGetCurrentDealInfoAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { + panic("implement me") +} + +func (mapi *mockGetCurrentDealInfoAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { + panic("implement me") +} + func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}] if !ok { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index bd59da7503f..7dd0a932f4b 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -5,16 +5,21 @@ import ( "context" "sync" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" ) type sectorCommittedEventsAPI interface { @@ -32,7 +37,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -46,6 +51,52 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } + if publishTs == types.EmptyTSK { + lookup, err := api.StateSearchMsg(ctx, *publishCid) + if err != nil { + return false, false, err + } + if lookup != nil { // can be nil in tests + publishTs = lookup.TipSet + } + } + + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(api))) + + publishAct, err := api.StateGetActor(ctx, provider, publishTs) + if err != nil { + return false, false, xerrors.Errorf("getting provider actor: %w", err) + } + + curAct, err := api.StateGetActor(ctx, provider, ts.Key()) + if err != nil { + return false, false, xerrors.Errorf("getting provider actor: %w", err) + } + + curSt, err := miner.Load(store, curAct) + if err != nil { + return false, false, xerrors.Errorf("leading miner actor: %w", err) + } + + pubSt, err := miner.Load(store, publishAct) + if err != nil { + return false, false, xerrors.Errorf("leading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(pubSt, curSt) + if err != nil { + return false, false, xerrors.Errorf("diff precommits: %w", err) + } + + for _, info := range diff.Added { + for _, d := range info.Info.DealIDs { + if d == di { + cb(info.Info.SectorNumber, false, nil) + return true, false, nil + } + } + } + // Not yet active, start matching against incoming messages return false, true, nil } @@ -88,7 +139,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // When the deal is published, the deal ID may change, so get the // current deal ID from the publish message CID - dealID, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, err } @@ -130,7 +181,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - isActive, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + _, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -186,7 +237,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // Get the deal info - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + _, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } @@ -216,22 +267,22 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (bool, error) { - _, sd, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) +func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) { + di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return false, xerrors.Errorf("failed to look up deal on chain: %w", err) + return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector with deal is already active if sd.State.SectorStartEpoch > 0 { - return true, nil + return 0, true, publishTs, nil } // Sector was slashed if sd.State.SlashEpoch > 0 { - return false, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) } - return false, nil + return di, false, publishTs, nil } From de0a45228202aa875c343234a133d30db7ebda00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 12:21:19 +0100 Subject: [PATCH 2/3] storageadapter: Fix tests, more testable diff api --- markets/storageadapter/client.go | 44 ++++++++++++++++--- markets/storageadapter/getcurrentdealinfo.go | 5 +-- .../storageadapter/getcurrentdealinfo_test.go | 13 ++---- .../storageadapter/ondealsectorcommitted.go | 29 +----------- .../ondealsectorcommitted_test.go | 3 +- markets/storageadapter/provider.go | 33 ++++++++++++++ 6 files changed, 78 insertions(+), 49 deletions(-) diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index 88a50931a18..b8604199067 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/shared" @@ -22,8 +23,11 @@ import ( market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/adt" marketactor "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/market" @@ -34,9 +38,7 @@ import ( ) type ClientNodeAdapter struct { - full.StateAPI - full.ChainAPI - full.MpoolAPI + *clientApi fundmgr *market.FundManager ev *events.Events @@ -46,14 +48,42 @@ type ClientNodeAdapter struct { type clientApi struct { full.ChainAPI full.StateAPI + full.MpoolAPI +} + +func (ca *clientApi) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca))) + + preAct, err := ca.StateGetActor(ctx, actor, pre) + if err != nil { + return nil, xerrors.Errorf("getting pre actor: %w", err) + } + curAct, err := ca.StateGetActor(ctx, actor, cur) + if err != nil { + return nil, xerrors.Errorf("getting cur actor: %w", err) + } + + preSt, err := miner.Load(store, preAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + curSt, err := miner.Load(store, curAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(preSt, curSt) + if err != nil { + return nil, xerrors.Errorf("diff precommits: %w", err) + } + + return diff, err } func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode { - capi := &clientApi{chain, stateapi} + capi := &clientApi{chain, stateapi, mpool} return &ClientNodeAdapter{ - StateAPI: stateapi, - ChainAPI: chain, - MpoolAPI: mpool, + clientApi: capi, fundmgr: fundmgr, ev: events.NewEvents(context.TODO(), capi), diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go index 5a80472079d..97311a0b284 100644 --- a/markets/storageadapter/getcurrentdealinfo.go +++ b/markets/storageadapter/getcurrentdealinfo.go @@ -9,6 +9,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -19,9 +20,7 @@ type getCurrentDealInfoAPI interface { StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) - StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) - ChainReadObj(context.Context, cid.Cid) ([]byte, error) - ChainHasObj(context.Context, cid.Cid) (bool, error) + diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) } // GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go index 46aaff2c368..5e3c10495d0 100644 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ b/markets/storageadapter/getcurrentdealinfo_test.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" test "github.com/filecoin-project/lotus/chain/events/state/mock" "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" @@ -236,16 +237,8 @@ type mockGetCurrentDealInfoAPI struct { MarketDeals map[marketDealKey]*api.MarketDeal } -func (mapi *mockGetCurrentDealInfoAPI) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) { - panic("implement me") -} - -func (mapi *mockGetCurrentDealInfoAPI) ChainReadObj(ctx context.Context, c cid.Cid) ([]byte, error) { - panic("implement me") -} - -func (mapi *mockGetCurrentDealInfoAPI) ChainHasObj(ctx context.Context, c cid.Cid) (bool, error) { - panic("implement me") +func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + return &miner.PreCommitChanges{}, nil } func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 7dd0a932f4b..6168fb5644f 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -6,16 +6,13 @@ import ( "sync" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" @@ -61,31 +58,9 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev } } - store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(api))) - - publishAct, err := api.StateGetActor(ctx, provider, publishTs) - if err != nil { - return false, false, xerrors.Errorf("getting provider actor: %w", err) - } - - curAct, err := api.StateGetActor(ctx, provider, ts.Key()) - if err != nil { - return false, false, xerrors.Errorf("getting provider actor: %w", err) - } - - curSt, err := miner.Load(store, curAct) + diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key()) if err != nil { - return false, false, xerrors.Errorf("leading miner actor: %w", err) - } - - pubSt, err := miner.Load(store, publishAct) - if err != nil { - return false, false, xerrors.Errorf("leading miner actor: %w", err) - } - - diff, err := miner.DiffPreCommits(pubSt, curSt) - if err != nil { - return false, false, xerrors.Errorf("diff precommits: %w", err) + return false, false, err } for _, info := range diff.Added { diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index b74a1e53213..dea1f89d2e9 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -161,8 +161,7 @@ func TestOnDealSectorPreCommitted(t *testing.T) { deals: map[abi.DealID]*api.MarketDeal{}, }, }, - expectedCBCallCount: 1, - expectedCBError: errors.New("handling applied event: something went wrong"), + expectedCBCallCount: 0, expectedError: errors.New("failed to set up called handler: something went wrong"), }, "proposed deal epoch timeout": { diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 4ce32d2bfa0..17f0a27ffd5 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" @@ -21,9 +22,12 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" @@ -70,6 +74,35 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA } } +func (n *ProviderNodeAdapter) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(n))) + + preAct, err := n.StateGetActor(ctx, actor, pre) + if err != nil { + return nil, xerrors.Errorf("getting pre actor: %w", err) + } + curAct, err := n.StateGetActor(ctx, actor, cur) + if err != nil { + return nil, xerrors.Errorf("getting cur actor: %w", err) + } + + preSt, err := miner.Load(store, preAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + curSt, err := miner.Load(store, curAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(preSt, curSt) + if err != nil { + return nil, xerrors.Errorf("diff precommits: %w", err) + } + + return diff, err +} + func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { log.Info("publishing deal") From ea85a2457ed0268b341d7fd210fe442221726c74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 21 Jan 2021 15:33:52 +0100 Subject: [PATCH 3/3] storageadapter: Address review --- markets/storageadapter/api.go | 53 +++++++++++++++++++ markets/storageadapter/client.go | 37 ++----------- .../storageadapter/ondealsectorcommitted.go | 6 +++ markets/storageadapter/provider.go | 37 ++----------- 4 files changed, 65 insertions(+), 68 deletions(-) create mode 100644 markets/storageadapter/api.go diff --git a/markets/storageadapter/api.go b/markets/storageadapter/api.go new file mode 100644 index 00000000000..9d89c7aa402 --- /dev/null +++ b/markets/storageadapter/api.go @@ -0,0 +1,53 @@ +package storageadapter + +import ( + "context" + + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors/adt" + + "github.com/filecoin-project/lotus/api/apibstore" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" +) + +type apiWrapper struct { + api interface { + StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) + ChainReadObj(context.Context, cid.Cid) ([]byte, error) + ChainHasObj(context.Context, cid.Cid) (bool, error) + } +} + +func (ca *apiWrapper) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca.api))) + + preAct, err := ca.api.StateGetActor(ctx, actor, pre) + if err != nil { + return nil, xerrors.Errorf("getting pre actor: %w", err) + } + curAct, err := ca.api.StateGetActor(ctx, actor, cur) + if err != nil { + return nil, xerrors.Errorf("getting cur actor: %w", err) + } + + preSt, err := miner.Load(store, preAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + curSt, err := miner.Load(store, curAct) + if err != nil { + return nil, xerrors.Errorf("loading miner actor: %w", err) + } + + diff, err := miner.DiffPreCommits(preSt, curSt) + if err != nil { + return nil, xerrors.Errorf("diff precommits: %w", err) + } + + return diff, err +} diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index b8604199067..f3491da4756 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/shared" @@ -23,11 +22,8 @@ import ( market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors/adt" marketactor "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/market" @@ -39,6 +35,7 @@ import ( type ClientNodeAdapter struct { *clientApi + *apiWrapper fundmgr *market.FundManager ev *events.Events @@ -51,39 +48,11 @@ type clientApi struct { full.MpoolAPI } -func (ca *clientApi) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { - store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(ca))) - - preAct, err := ca.StateGetActor(ctx, actor, pre) - if err != nil { - return nil, xerrors.Errorf("getting pre actor: %w", err) - } - curAct, err := ca.StateGetActor(ctx, actor, cur) - if err != nil { - return nil, xerrors.Errorf("getting cur actor: %w", err) - } - - preSt, err := miner.Load(store, preAct) - if err != nil { - return nil, xerrors.Errorf("loading miner actor: %w", err) - } - curSt, err := miner.Load(store, curAct) - if err != nil { - return nil, xerrors.Errorf("loading miner actor: %w", err) - } - - diff, err := miner.DiffPreCommits(preSt, curSt) - if err != nil { - return nil, xerrors.Errorf("diff precommits: %w", err) - } - - return diff, err -} - func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode { capi := &clientApi{chain, stateapi, mpool} return &ClientNodeAdapter{ - clientApi: capi, + clientApi: capi, + apiWrapper: &apiWrapper{api: capi}, fundmgr: fundmgr, ev: events.NewEvents(context.TODO(), capi), diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 6168fb5644f..5466c81ef4a 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -48,6 +48,12 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return true, false, nil } + // Check that precommits which landed between when the deal was published + // and now don't already contain the deal we care about. + // (this can happen when the precommit lands vary quickly (in tests), or + // when the client node was down after the deal was published, and when + // the precommit containing it landed on chain) + if publishTs == types.EmptyTSK { lookup, err := api.StateSearchMsg(ctx, *publishCid) if err != nil { diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 17f0a27ffd5..085888ee31a 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -8,7 +8,6 @@ import ( "time" "github.com/ipfs/go-cid" - cbor "github.com/ipfs/go-ipld-cbor" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" @@ -22,12 +21,9 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/api/apibstore" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors" - "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/events/state" "github.com/filecoin-project/lotus/chain/types" @@ -45,6 +41,7 @@ var log = logging.Logger("storageadapter") type ProviderNodeAdapter struct { api.FullNode + *apiWrapper // this goes away with the data transfer module dag dtypes.StagingDAG @@ -59,7 +56,8 @@ type ProviderNodeAdapter struct { func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { na := &ProviderNodeAdapter{ - FullNode: full, + FullNode: full, + apiWrapper: &apiWrapper{api: full}, dag: dag, secb: secb, @@ -74,35 +72,6 @@ func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDA } } -func (n *ProviderNodeAdapter) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { - store := adt.WrapStore(ctx, cbor.NewCborStore(apibstore.NewAPIBlockstore(n))) - - preAct, err := n.StateGetActor(ctx, actor, pre) - if err != nil { - return nil, xerrors.Errorf("getting pre actor: %w", err) - } - curAct, err := n.StateGetActor(ctx, actor, cur) - if err != nil { - return nil, xerrors.Errorf("getting cur actor: %w", err) - } - - preSt, err := miner.Load(store, preAct) - if err != nil { - return nil, xerrors.Errorf("loading miner actor: %w", err) - } - curSt, err := miner.Load(store, curAct) - if err != nil { - return nil, xerrors.Errorf("loading miner actor: %w", err) - } - - diff, err := miner.DiffPreCommits(preSt, curSt) - if err != nil { - return nil, xerrors.Errorf("diff precommits: %w", err) - } - - return diff, err -} - func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { log.Info("publishing deal")