From 80b6994fe2cfede39053c452a335fb6dd19c9122 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 16 Sep 2020 19:13:12 -0700 Subject: [PATCH] feat(market): update state diffing for market actor Update to abstract actor for markets state diffing. Also move the diff adt functions inside the abstract actors --- .../{events/state => actors/adt}/diff_adt.go | 7 +- .../state => actors/adt}/diff_adt_test.go | 19 +- chain/actors/builtin/market/market.go | 52 +++++ chain/actors/builtin/market/v0.go | 201 +++++++++++++++- chain/events/state/predicates.go | 221 +++++------------- chain/events/state/predicates_test.go | 81 ++++--- cmd/lotus-chainwatch/processor/market.go | 7 +- 7 files changed, 373 insertions(+), 215 deletions(-) rename chain/{events/state => actors/adt}/diff_adt.go (94%) rename chain/{events/state => actors/adt}/diff_adt_test.go (96%) diff --git a/chain/events/state/diff_adt.go b/chain/actors/adt/diff_adt.go similarity index 94% rename from chain/events/state/diff_adt.go rename to chain/actors/adt/diff_adt.go index 519c61c2df7..0784b77a2f1 100644 --- a/chain/events/state/diff_adt.go +++ b/chain/actors/adt/diff_adt.go @@ -1,10 +1,9 @@ -package state +package adt import ( "bytes" "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/specs-actors/actors/util/adt" typegen "github.com/whyrusleeping/cbor-gen" ) @@ -27,7 +26,7 @@ type AdtArrayDiff interface { // - All values that exist in curArr nnd not in prevArr are passed to adtArrayDiff.Add() // - All values that exist in preArr and in curArr are passed to AdtArrayDiff.Modify() // - It is the responsibility of AdtArrayDiff.Modify() to determine if the values it was passed have been modified. -func DiffAdtArray(preArr, curArr *adt.Array, out AdtArrayDiff) error { +func DiffAdtArray(preArr, curArr Array, out AdtArrayDiff) error { prevVal := new(typegen.Deferred) if err := preArr.ForEach(prevVal, func(i int64) error { curVal := new(typegen.Deferred) @@ -76,7 +75,7 @@ type AdtMapDiff interface { Remove(key string, val *typegen.Deferred) error } -func DiffAdtMap(preMap, curMap *adt.Map, out AdtMapDiff) error { +func DiffAdtMap(preMap, curMap Map, out AdtMapDiff) error { prevVal := new(typegen.Deferred) if err := preMap.ForEach(prevVal, func(key string) error { curVal := new(typegen.Deferred) diff --git a/chain/events/state/diff_adt_test.go b/chain/actors/adt/diff_adt_test.go similarity index 96% rename from chain/events/state/diff_adt_test.go rename to chain/actors/adt/diff_adt_test.go index 55926afb9ad..436e28bbf52 100644 --- a/chain/events/state/diff_adt_test.go +++ b/chain/actors/adt/diff_adt_test.go @@ -1,4 +1,4 @@ -package state +package adt import ( "bytes" @@ -13,7 +13,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/actors/runtime" - "github.com/filecoin-project/specs-actors/actors/util/adt" + v0adt "github.com/filecoin-project/specs-actors/actors/util/adt" bstore "github.com/filecoin-project/lotus/lib/blockstore" ) @@ -22,8 +22,8 @@ func TestDiffAdtArray(t *testing.T) { ctxstoreA := newContextStore() ctxstoreB := newContextStore() - arrA := adt.MakeEmptyArray(ctxstoreA) - arrB := adt.MakeEmptyArray(ctxstoreB) + arrA := v0adt.MakeEmptyArray(ctxstoreA) + arrB := v0adt.MakeEmptyArray(ctxstoreB) require.NoError(t, arrA.Set(0, runtime.CBORBytes([]byte{0}))) // delete @@ -76,8 +76,8 @@ func TestDiffAdtMap(t *testing.T) { ctxstoreA := newContextStore() ctxstoreB := newContextStore() - mapA := adt.MakeEmptyMap(ctxstoreA) - mapB := adt.MakeEmptyMap(ctxstoreB) + mapA := v0adt.MakeEmptyMap(ctxstoreA) + mapB := v0adt.MakeEmptyMap(ctxstoreB) require.NoError(t, mapA.Put(abi.UIntKey(0), runtime.CBORBytes([]byte{0}))) // delete @@ -292,12 +292,9 @@ func (t *TestDiffArray) Remove(key uint64, val *typegen.Deferred) error { return nil } -func newContextStore() *contextStore { +func newContextStore() Store { ctx := context.Background() bs := bstore.NewTemporarySync() store := cbornode.NewCborStore(bs) - return &contextStore{ - ctx: ctx, - cst: store, - } + return WrapStore(ctx, store) } diff --git a/chain/actors/builtin/market/market.go b/chain/actors/builtin/market/market.go index 99cca98797b..051c46dbeb4 100644 --- a/chain/actors/builtin/market/market.go +++ b/chain/actors/builtin/market/market.go @@ -29,9 +29,14 @@ func Load(store adt.Store, act *types.Actor) (st State, err error) { type State interface { cbor.Marshaler + BalancesChanged(State) bool EscrowTable() (BalanceTable, error) LockedTable() (BalanceTable, error) TotalLocked() (abi.TokenAmount, error) + StatesChanged(State) bool + States() (DealStates, error) + ProposalsChanged(State) bool + Proposals() (DealProposals, error) VerifyDealsForActivation( minerAddr address.Address, deals []abi.DealID, currEpoch, sectorExpiry abi.ChainEpoch, ) (weight, verifiedWeight abi.DealWeight, err error) @@ -40,3 +45,50 @@ type State interface { type BalanceTable interface { Get(key address.Address) (abi.TokenAmount, error) } + +type DealStates interface { + GetDeal(key abi.DealID) (DealState, error) + Diff(DealStates) (*DealStateChanges, error) +} + +type DealProposals interface { + Diff(DealProposals) (*DealProposalChanges, error) +} + +type DealState interface { + SectorStartEpoch() abi.ChainEpoch + SlashEpoch() abi.ChainEpoch + LastUpdatedEpoch() abi.ChainEpoch + Equals(DealState) bool +} + +type DealProposal interface { +} + +type DealStateChanges struct { + Added []DealIDState + Modified []DealStateChange + Removed []DealIDState +} + +type DealIDState struct { + ID abi.DealID + Deal DealState +} + +// DealStateChange is a change in deal state from -> to +type DealStateChange struct { + ID abi.DealID + From DealState + To DealState +} + +type DealProposalChanges struct { + Added []ProposalIDState + Removed []ProposalIDState +} + +type ProposalIDState struct { + ID abi.DealID + Proposal DealProposal +} diff --git a/chain/actors/builtin/market/v0.go b/chain/actors/builtin/market/v0.go index fb67902da21..8f0aa05946d 100644 --- a/chain/actors/builtin/market/v0.go +++ b/chain/actors/builtin/market/v0.go @@ -1,11 +1,17 @@ package market import ( + "bytes" + "errors" + "fmt" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/builtin/market" - "github.com/filecoin-project/specs-actors/actors/util/adt" + v0adt "github.com/filecoin-project/specs-actors/actors/util/adt" + typegen "github.com/whyrusleeping/cbor-gen" ) type v0State struct { @@ -19,12 +25,58 @@ func (s *v0State) TotalLocked() (abi.TokenAmount, error) { return fml, nil } +func (s *v0State) BalancesChanged(otherState State) bool { + v0otherState, ok := otherState.(*v0State) + if !ok { + // there's no way to compare differnt versions of the state, so let's + // just say that means the state of balances has changed + return true + } + return !s.State.EscrowTable.Equals(v0otherState.State.EscrowTable) || !s.State.LockedTable.Equals(v0otherState.State.LockedTable) +} + +func (s *v0State) StatesChanged(otherState State) bool { + v0otherState, ok := otherState.(*v0State) + if !ok { + // there's no way to compare differnt versions of the state, so let's + // just say that means the state of balances has changed + return true + } + return !s.State.States.Equals(v0otherState.State.States) +} + +func (s *v0State) States() (DealStates, error) { + stateArray, err := v0adt.AsArray(s.store, s.State.States) + if err != nil { + return nil, err + } + return &v0DealStates{stateArray}, nil +} + +func (s *v0State) ProposalsChanged(otherState State) bool { + v0otherState, ok := otherState.(*v0State) + if !ok { + // there's no way to compare differnt versions of the state, so let's + // just say that means the state of balances has changed + return true + } + return !s.State.Proposals.Equals(v0otherState.State.Proposals) +} + +func (s *v0State) Proposals() (DealProposals, error) { + proposalArray, err := v0adt.AsArray(s.store, s.State.Proposals) + if err != nil { + return nil, err + } + return &v0DealProposals{proposalArray}, nil +} + func (s *v0State) EscrowTable() (BalanceTable, error) { - return adt.AsBalanceTable(s.store, s.State.EscrowTable) + return v0adt.AsBalanceTable(s.store, s.State.EscrowTable) } func (s *v0State) LockedTable() (BalanceTable, error) { - return adt.AsBalanceTable(s.store, s.State.LockedTable) + return v0adt.AsBalanceTable(s.store, s.State.LockedTable) } func (s *v0State) VerifyDealsForActivation( @@ -32,3 +84,146 @@ func (s *v0State) VerifyDealsForActivation( ) (weight, verifiedWeight abi.DealWeight, err error) { return market.ValidateDealsForActivation(&s.State, s.store, deals, minerAddr, sectorExpiry, currEpoch) } + +type v0DealStates struct { + adt.Array +} + +func (s *v0DealStates) GetDeal(dealID abi.DealID) (DealState, error) { + var deal market.DealState + found, err := s.Array.Get(uint64(dealID), &deal) + if err != nil { + return nil, err + } + if !found { + return nil, nil + } + return &v0DealState{deal}, nil +} + +func (s *v0DealStates) Diff(other DealStates) (*DealStateChanges, error) { + v0other, ok := other.(*v0DealStates) + if !ok { + // TODO handle this if possible on a case by case basis but for now, just fail + return nil, errors.New("cannot compare deal states across versions") + } + results := new(DealStateChanges) + if err := adt.DiffAdtArray(s, v0other, &v0MarketStatesDiffer{results}); err != nil { + return nil, fmt.Errorf("diffing deal states: %w", err) + } + + return results, nil +} + +type v0MarketStatesDiffer struct { + Results *DealStateChanges +} + +func (d *v0MarketStatesDiffer) Add(key uint64, val *typegen.Deferred) error { + ds := new(v0DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + d.Results.Added = append(d.Results.Added, DealIDState{abi.DealID(key), ds}) + return nil +} + +func (d *v0MarketStatesDiffer) Modify(key uint64, from, to *typegen.Deferred) error { + dsFrom := new(v0DealState) + if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil { + return err + } + + dsTo := new(v0DealState) + if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil { + return err + } + + if *dsFrom != *dsTo { + d.Results.Modified = append(d.Results.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo}) + } + return nil +} + +func (d *v0MarketStatesDiffer) Remove(key uint64, val *typegen.Deferred) error { + ds := new(v0DealState) + err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + d.Results.Removed = append(d.Results.Removed, DealIDState{abi.DealID(key), ds}) + return nil +} + +type v0DealState struct { + market.DealState +} + +func (ds *v0DealState) SectorStartEpoch() abi.ChainEpoch { + return ds.DealState.SectorStartEpoch +} + +func (ds *v0DealState) SlashEpoch() abi.ChainEpoch { + return ds.DealState.SlashEpoch +} + +func (ds *v0DealState) LastUpdatedEpoch() abi.ChainEpoch { + return ds.DealState.LastUpdatedEpoch +} + +func (ds *v0DealState) Equals(other DealState) bool { + v0other, ok := other.(*v0DealState) + return ok && *ds == *v0other +} + +type v0DealProposals struct { + adt.Array +} + +func (s *v0DealProposals) Diff(other DealProposals) (*DealProposalChanges, error) { + v0other, ok := other.(*v0DealProposals) + if !ok { + // TODO handle this if possible on a case by case basis but for now, just fail + return nil, errors.New("cannot compare deal proposals across versions") + } + results := new(DealProposalChanges) + if err := adt.DiffAdtArray(s, v0other, &v0MarketProposalsDiffer{results}); err != nil { + return nil, fmt.Errorf("diffing deal proposals: %w", err) + } + + return results, nil +} + +type v0MarketProposalsDiffer struct { + Results *DealProposalChanges +} + +type v0DealProposal struct { + market.DealProposal +} + +func (d *v0MarketProposalsDiffer) Add(key uint64, val *typegen.Deferred) error { + dp := new(v0DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + d.Results.Added = append(d.Results.Added, ProposalIDState{abi.DealID(key), dp}) + return nil +} + +func (d *v0MarketProposalsDiffer) Modify(key uint64, from, to *typegen.Deferred) error { + // short circuit, DealProposals are static + return nil +} + +func (d *v0MarketProposalsDiffer) Remove(key uint64, val *typegen.Deferred) error { + dp := new(v0DealProposal) + err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) + if err != nil { + return err + } + d.Results.Removed = append(d.Results.Removed, ProposalIDState{abi.DealID(key), dp}) + return nil +} diff --git a/chain/events/state/predicates.go b/chain/events/state/predicates.go index e5caa41d243..e1e88e1a1d0 100644 --- a/chain/events/state/predicates.go +++ b/chain/events/state/predicates.go @@ -7,12 +7,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/paych" "github.com/filecoin-project/specs-actors/actors/builtin" init_ "github.com/filecoin-project/specs-actors/actors/builtin/init" - "github.com/filecoin-project/specs-actors/actors/builtin/market" "github.com/filecoin-project/specs-actors/actors/builtin/miner" - "github.com/filecoin-project/specs-actors/actors/util/adt" + v0adt "github.com/filecoin-project/specs-actors/actors/util/adt" cbor "github.com/ipfs/go-ipld-cbor" typegen "github.com/whyrusleeping/cbor-gen" @@ -69,26 +70,26 @@ func (sp *StatePredicates) OnActorStateChanged(addr address.Address, diffStateFu } } -type DiffStorageMarketStateFunc func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) +type DiffStorageMarketStateFunc func(ctx context.Context, oldState market.State, newState market.State) (changed bool, user UserData, err error) // OnStorageMarketActorChanged calls diffStorageMarketState when the state changes for the market actor func (sp *StatePredicates) OnStorageMarketActorChanged(diffStorageMarketState DiffStorageMarketStateFunc) DiffTipSetKeyFunc { return sp.OnActorStateChanged(builtin.StorageMarketActorAddr, func(ctx context.Context, oldActorState, newActorState *types.Actor) (changed bool, user UserData, err error) { - var oldState market.State - if err := sp.cst.Get(ctx, oldActorState.Head, &oldState); err != nil { + oldState, err := market.Load(adt.WrapStore(ctx, sp.cst), oldActorState) + if err != nil { return false, nil, err } - var newState market.State - if err := sp.cst.Get(ctx, newActorState.Head, &newState); err != nil { + newState, err := market.Load(adt.WrapStore(ctx, sp.cst), newActorState) + if err != nil { return false, nil, err } - return diffStorageMarketState(ctx, &oldState, &newState) + return diffStorageMarketState(ctx, oldState, newState) }) } type BalanceTables struct { - EscrowTable *adt.BalanceTable - LockedTable *adt.BalanceTable + EscrowTable market.BalanceTable + LockedTable market.BalanceTable } // DiffBalanceTablesFunc compares two balance tables @@ -96,32 +97,27 @@ type DiffBalanceTablesFunc func(ctx context.Context, oldBalanceTable, newBalance // OnBalanceChanged runs when the escrow table for available balances changes func (sp *StatePredicates) OnBalanceChanged(diffBalances DiffBalanceTablesFunc) DiffStorageMarketStateFunc { - return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { - if oldState.EscrowTable.Equals(newState.EscrowTable) && oldState.LockedTable.Equals(newState.LockedTable) { + return func(ctx context.Context, oldState market.State, newState market.State) (changed bool, user UserData, err error) { + if !oldState.BalancesChanged(newState) { return false, nil, nil } - ctxStore := &contextStore{ - ctx: ctx, - cst: sp.cst, - } - - oldEscrowRoot, err := adt.AsBalanceTable(ctxStore, oldState.EscrowTable) + oldEscrowRoot, err := oldState.EscrowTable() if err != nil { return false, nil, err } - oldLockedRoot, err := adt.AsBalanceTable(ctxStore, oldState.LockedTable) + oldLockedRoot, err := oldState.LockedTable() if err != nil { return false, nil, err } - newEscrowRoot, err := adt.AsBalanceTable(ctxStore, newState.EscrowTable) + newEscrowRoot, err := newState.EscrowTable() if err != nil { return false, nil, err } - newLockedRoot, err := adt.AsBalanceTable(ctxStore, newState.LockedTable) + newLockedRoot, err := newState.LockedTable() if err != nil { return false, nil, err } @@ -130,25 +126,22 @@ func (sp *StatePredicates) OnBalanceChanged(diffBalances DiffBalanceTablesFunc) } } -type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot *adt.Array) (changed bool, user UserData, err error) +type DiffDealStatesFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot market.DealStates) (changed bool, user UserData, err error) +type DiffDealProposalsFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot market.DealProposals) (changed bool, user UserData, err error) +type DiffAdtArraysFunc func(ctx context.Context, oldDealStateRoot, newDealStateRoot adt.Array) (changed bool, user UserData, err error) // OnDealStateChanged calls diffDealStates when the market deal state changes -func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) DiffStorageMarketStateFunc { - return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { - if oldState.States.Equals(newState.States) { +func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffDealStatesFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState market.State, newState market.State) (changed bool, user UserData, err error) { + if !oldState.StatesChanged(newState) { return false, nil, nil } - ctxStore := &contextStore{ - ctx: ctx, - cst: sp.cst, - } - - oldRoot, err := adt.AsArray(ctxStore, oldState.States) + oldRoot, err := oldState.States() if err != nil { return false, nil, err } - newRoot, err := adt.AsArray(ctxStore, newState.States) + newRoot, err := newState.States() if err != nil { return false, nil, err } @@ -158,22 +151,17 @@ func (sp *StatePredicates) OnDealStateChanged(diffDealStates DiffAdtArraysFunc) } // OnDealProposalChanged calls diffDealProps when the market proposal state changes -func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc) DiffStorageMarketStateFunc { - return func(ctx context.Context, oldState *market.State, newState *market.State) (changed bool, user UserData, err error) { - if oldState.Proposals.Equals(newState.Proposals) { +func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffDealProposalsFunc) DiffStorageMarketStateFunc { + return func(ctx context.Context, oldState market.State, newState market.State) (changed bool, user UserData, err error) { + if !oldState.ProposalsChanged(newState) { return false, nil, nil } - ctxStore := &contextStore{ - ctx: ctx, - cst: sp.cst, - } - - oldRoot, err := adt.AsArray(ctxStore, oldState.Proposals) + oldRoot, err := oldState.Proposals() if err != nil { return false, nil, err } - newRoot, err := adt.AsArray(ctxStore, newState.Proposals) + newRoot, err := newState.Proposals() if err != nil { return false, nil, err } @@ -182,51 +170,14 @@ func (sp *StatePredicates) OnDealProposalChanged(diffDealProps DiffAdtArraysFunc } } -var _ AdtArrayDiff = &MarketDealProposalChanges{} - -type MarketDealProposalChanges struct { - Added []ProposalIDState - Removed []ProposalIDState -} - -type ProposalIDState struct { - ID abi.DealID - Proposal market.DealProposal -} - -func (m *MarketDealProposalChanges) Add(key uint64, val *typegen.Deferred) error { - dp := new(market.DealProposal) - err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) - if err != nil { - return err - } - m.Added = append(m.Added, ProposalIDState{abi.DealID(key), *dp}) - return nil -} - -func (m *MarketDealProposalChanges) Modify(key uint64, from, to *typegen.Deferred) error { - // short circuit, DealProposals are static - return nil -} - -func (m *MarketDealProposalChanges) Remove(key uint64, val *typegen.Deferred) error { - dp := new(market.DealProposal) - err := dp.UnmarshalCBOR(bytes.NewReader(val.Raw)) - if err != nil { - return err - } - m.Removed = append(m.Removed, ProposalIDState{abi.DealID(key), *dp}) - return nil -} - // OnDealProposalAmtChanged detects changes in the deal proposal AMT for all deal proposals and returns a MarketProposalsChanges structure containing: // - Added Proposals // - Modified Proposals // - Removed Proposals -func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc { - return func(ctx context.Context, oldDealProps, newDealProps *adt.Array) (changed bool, user UserData, err error) { - proposalChanges := new(MarketDealProposalChanges) - if err := DiffAdtArray(oldDealProps, newDealProps, proposalChanges); err != nil { +func (sp *StatePredicates) OnDealProposalAmtChanged() DiffDealProposalsFunc { + return func(ctx context.Context, oldDealProps, newDealProps market.DealProposals) (changed bool, user UserData, err error) { + proposalChanges, err := oldDealProps.Diff(newDealProps) + if err != nil { return false, nil, err } @@ -238,64 +189,14 @@ func (sp *StatePredicates) OnDealProposalAmtChanged() DiffAdtArraysFunc { } } -var _ AdtArrayDiff = &MarketDealStateChanges{} - -type MarketDealStateChanges struct { - Added []DealIDState - Modified []DealStateChange - Removed []DealIDState -} - -type DealIDState struct { - ID abi.DealID - Deal market.DealState -} - -func (m *MarketDealStateChanges) Add(key uint64, val *typegen.Deferred) error { - ds := new(market.DealState) - err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) - if err != nil { - return err - } - m.Added = append(m.Added, DealIDState{abi.DealID(key), *ds}) - return nil -} - -func (m *MarketDealStateChanges) Modify(key uint64, from, to *typegen.Deferred) error { - dsFrom := new(market.DealState) - if err := dsFrom.UnmarshalCBOR(bytes.NewReader(from.Raw)); err != nil { - return err - } - - dsTo := new(market.DealState) - if err := dsTo.UnmarshalCBOR(bytes.NewReader(to.Raw)); err != nil { - return err - } - - if *dsFrom != *dsTo { - m.Modified = append(m.Modified, DealStateChange{abi.DealID(key), dsFrom, dsTo}) - } - return nil -} - -func (m *MarketDealStateChanges) Remove(key uint64, val *typegen.Deferred) error { - ds := new(market.DealState) - err := ds.UnmarshalCBOR(bytes.NewReader(val.Raw)) - if err != nil { - return err - } - m.Removed = append(m.Removed, DealIDState{abi.DealID(key), *ds}) - return nil -} - // OnDealStateAmtChanged detects changes in the deal state AMT for all deal states and returns a MarketDealStateChanges structure containing: // - Added Deals // - Modified Deals // - Removed Deals -func (sp *StatePredicates) OnDealStateAmtChanged() DiffAdtArraysFunc { - return func(ctx context.Context, oldDealStates, newDealStates *adt.Array) (changed bool, user UserData, err error) { - dealStateChanges := new(MarketDealStateChanges) - if err := DiffAdtArray(oldDealStates, newDealStates, dealStateChanges); err != nil { +func (sp *StatePredicates) OnDealStateAmtChanged() DiffDealStatesFunc { + return func(ctx context.Context, oldDealStates, newDealStates market.DealStates) (changed bool, user UserData, err error) { + dealStateChanges, err := oldDealStates.Diff(newDealStates) + if err != nil { return false, nil, err } @@ -313,37 +214,31 @@ type ChangedDeals map[abi.DealID]DealStateChange // DealStateChange is a change in deal state from -> to type DealStateChange struct { ID abi.DealID - From *market.DealState - To *market.DealState + From market.DealState + To market.DealState } // DealStateChangedForIDs detects changes in the deal state AMT for the given deal IDs -func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffAdtArraysFunc { - return func(ctx context.Context, oldDealStateArray, newDealStateArray *adt.Array) (changed bool, user UserData, err error) { +func (sp *StatePredicates) DealStateChangedForIDs(dealIds []abi.DealID) DiffDealStatesFunc { + return func(ctx context.Context, oldDealStates, newDealStates market.DealStates) (changed bool, user UserData, err error) { changedDeals := make(ChangedDeals) for _, dealID := range dealIds { - var oldDealPtr, newDealPtr *market.DealState - var oldDeal, newDeal market.DealState // If the deal has been removed, we just set it to nil - found, err := oldDealStateArray.Get(uint64(dealID), &oldDeal) + oldDeal, err := oldDealStates.GetDeal(dealID) if err != nil { return false, nil, err } - if found { - oldDealPtr = &oldDeal - } - found, err = newDealStateArray.Get(uint64(dealID), &newDeal) + newDeal, err := newDealStates.GetDeal(dealID) if err != nil { return false, nil, err } - if found { - newDealPtr = &newDeal - } - if oldDeal != newDeal { - changedDeals[dealID] = DealStateChange{dealID, oldDealPtr, newDealPtr} + existenceChanged := (oldDeal == nil) != (newDeal == nil) + valueChanged := (oldDeal != nil && newDeal != nil) && !oldDeal.Equals(newDeal) + if existenceChanged || valueChanged { + changedDeals[dealID] = DealStateChange{dealID, oldDeal, newDeal} } } if len(changedDeals) > 0 { @@ -441,7 +336,7 @@ type MinerSectorChanges struct { Removed []miner.SectorOnChainInfo } -var _ AdtArrayDiff = &MinerSectorChanges{} +var _ adt.AdtArrayDiff = &MinerSectorChanges{} type SectorExtensions struct { From miner.SectorOnChainInfo @@ -508,17 +403,17 @@ func (sp *StatePredicates) OnMinerSectorChange() DiffMinerActorStateFunc { return false, nil, nil } - oldSectors, err := adt.AsArray(ctxStore, oldState.Sectors) + oldSectors, err := v0adt.AsArray(ctxStore, oldState.Sectors) if err != nil { return false, nil, err } - newSectors, err := adt.AsArray(ctxStore, newState.Sectors) + newSectors, err := v0adt.AsArray(ctxStore, newState.Sectors) if err != nil { return false, nil, err } - if err := DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil { + if err := adt.DiffAdtArray(oldSectors, newSectors, sectorChanges); err != nil { return false, nil, err } @@ -584,17 +479,17 @@ func (sp *StatePredicates) OnMinerPreCommitChange() DiffMinerActorStateFunc { return false, nil, nil } - oldPrecommits, err := adt.AsMap(ctxStore, oldState.PreCommittedSectors) + oldPrecommits, err := v0adt.AsMap(ctxStore, oldState.PreCommittedSectors) if err != nil { return false, nil, err } - newPrecommits, err := adt.AsMap(ctxStore, newState.PreCommittedSectors) + newPrecommits, err := v0adt.AsMap(ctxStore, newState.PreCommittedSectors) if err != nil { return false, nil, err } - if err := DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil { + if err := adt.DiffAdtMap(oldPrecommits, newPrecommits, precommitChanges); err != nil { return false, nil, err } @@ -763,17 +658,17 @@ func (sp *StatePredicates) OnAddressMapChange() DiffInitActorStateFunc { return false, nil, nil } - oldAddrs, err := adt.AsMap(ctxStore, oldState.AddressMap) + oldAddrs, err := v0adt.AsMap(ctxStore, oldState.AddressMap) if err != nil { return false, nil, err } - newAddrs, err := adt.AsMap(ctxStore, newState.AddressMap) + newAddrs, err := v0adt.AsMap(ctxStore, newState.AddressMap) if err != nil { return false, nil, err } - if err := DiffAdtMap(oldAddrs, newAddrs, addressChanges); err != nil { + if err := adt.DiffAdtMap(oldAddrs, newAddrs, addressChanges); err != nil { return false, nil, err } diff --git a/chain/events/state/predicates_test.go b/chain/events/state/predicates_test.go index 7117a96cc4a..783c720edbd 100644 --- a/chain/events/state/predicates_test.go +++ b/chain/events/state/predicates_test.go @@ -16,7 +16,10 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" - "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + v0builtin "github.com/filecoin-project/specs-actors/actors/builtin" + v0market "github.com/filecoin-project/specs-actors/actors/builtin/market" + "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/util/adt" tutils "github.com/filecoin-project/specs-actors/support/testing" @@ -69,22 +72,22 @@ func TestMarketPredicates(t *testing.T) { bs := bstore.NewTemporarySync() store := adt.WrapStore(ctx, cbornode.NewCborStore(bs)) - oldDeal1 := &market.DealState{ + oldDeal1 := &v0market.DealState{ SectorStartEpoch: 1, LastUpdatedEpoch: 2, SlashEpoch: 0, } - oldDeal2 := &market.DealState{ + oldDeal2 := &v0market.DealState{ SectorStartEpoch: 4, LastUpdatedEpoch: 5, SlashEpoch: 0, } - oldDeals := map[abi.DealID]*market.DealState{ + oldDeals := map[abi.DealID]*v0market.DealState{ abi.DealID(1): oldDeal1, abi.DealID(2): oldDeal2, } - oldProp1 := &market.DealProposal{ + oldProp1 := &v0market.DealProposal{ PieceCID: dummyCid, PieceSize: 0, VerifiedDeal: false, @@ -96,7 +99,7 @@ func TestMarketPredicates(t *testing.T) { ProviderCollateral: big.Zero(), ClientCollateral: big.Zero(), } - oldProp2 := &market.DealProposal{ + oldProp2 := &v0market.DealProposal{ PieceCID: dummyCid, PieceSize: 0, VerifiedDeal: false, @@ -108,7 +111,7 @@ func TestMarketPredicates(t *testing.T) { ProviderCollateral: big.Zero(), ClientCollateral: big.Zero(), } - oldProps := map[abi.DealID]*market.DealProposal{ + oldProps := map[abi.DealID]*v0market.DealProposal{ abi.DealID(1): oldProp1, abi.DealID(2): oldProp2, } @@ -122,7 +125,7 @@ func TestMarketPredicates(t *testing.T) { oldStateC := createMarketState(ctx, t, store, oldDeals, oldProps, oldBalances) - newDeal1 := &market.DealState{ + newDeal1 := &v0market.DealState{ SectorStartEpoch: 1, LastUpdatedEpoch: 3, SlashEpoch: 0, @@ -131,19 +134,19 @@ func TestMarketPredicates(t *testing.T) { // deal 2 removed // added - newDeal3 := &market.DealState{ + newDeal3 := &v0market.DealState{ SectorStartEpoch: 1, LastUpdatedEpoch: 2, SlashEpoch: 3, } - newDeals := map[abi.DealID]*market.DealState{ + newDeals := map[abi.DealID]*v0market.DealState{ abi.DealID(1): newDeal1, // deal 2 was removed abi.DealID(3): newDeal3, } // added - newProp3 := &market.DealProposal{ + newProp3 := &v0market.DealProposal{ PieceCID: dummyCid, PieceSize: 0, VerifiedDeal: false, @@ -155,7 +158,7 @@ func TestMarketPredicates(t *testing.T) { ProviderCollateral: big.Zero(), ClientCollateral: big.Zero(), } - newProps := map[abi.DealID]*market.DealProposal{ + newProps := map[abi.DealID]*v0market.DealProposal{ abi.DealID(1): oldProp1, // 1 was persisted // prop 2 was removed abi.DealID(3): newProp3, // new @@ -178,8 +181,8 @@ func TestMarketPredicates(t *testing.T) { require.NoError(t, err) api := newMockAPI(bs) - api.setActor(oldState.Key(), &types.Actor{Head: oldStateC}) - api.setActor(newState.Key(), &types.Actor{Head: newStateC}) + api.setActor(oldState.Key(), &types.Actor{Code: v0builtin.StorageMarketActorCodeID, Head: oldStateC}) + api.setActor(newState.Key(), &types.Actor{Code: v0builtin.StorageMarketActorCodeID, Head: newStateC}) t.Run("deal ID predicate", func(t *testing.T) { preds := NewStatePredicates(api) @@ -203,11 +206,11 @@ func TestMarketPredicates(t *testing.T) { require.Contains(t, changedDealIDs, abi.DealID(1)) require.Contains(t, changedDealIDs, abi.DealID(2)) deal1 := changedDealIDs[abi.DealID(1)] - if deal1.From.LastUpdatedEpoch != 2 || deal1.To.LastUpdatedEpoch != 3 { + if deal1.From.LastUpdatedEpoch() != 2 || deal1.To.LastUpdatedEpoch() != 3 { t.Fatal("Unexpected change to LastUpdatedEpoch") } deal2 := changedDealIDs[abi.DealID(2)] - if deal2.From.LastUpdatedEpoch != 5 || deal2.To != nil { + if deal2.From.LastUpdatedEpoch() != 5 || deal2.To != nil { t.Fatal("Expected To to be nil") } @@ -230,11 +233,17 @@ func TestMarketPredicates(t *testing.T) { require.False(t, changed) // Test that OnDealStateChanged does not call the callback if the state has not changed - diffDealStateFn := preds.OnDealStateChanged(func(context.Context, *adt.Array, *adt.Array) (bool, UserData, error) { + diffDealStateFn := preds.OnDealStateChanged(func(context.Context, market.DealStates, market.DealStates) (bool, UserData, error) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - marketState := createEmptyMarketState(t, store) + v0marketState := createEmptyMarketState(t, store) + marketCid, err := store.Put(ctx, v0marketState) + require.NoError(t, err) + marketState, err := market.Load(store, &types.Actor{ + Code: v0builtin.StorageMarketActorCodeID, + Head: marketCid, + }) changed, _, err = diffDealStateFn(ctx, marketState, marketState) require.NoError(t, err) require.False(t, changed) @@ -252,18 +261,18 @@ func TestMarketPredicates(t *testing.T) { require.NoError(t, err) require.True(t, changed) - changedDeals, ok := valArr.(*MarketDealStateChanges) + changedDeals, ok := valArr.(*market.DealStateChanges) require.True(t, ok) require.Len(t, changedDeals.Added, 1) require.Equal(t, abi.DealID(3), changedDeals.Added[0].ID) - require.Equal(t, *newDeal3, changedDeals.Added[0].Deal) + require.True(t, dealEquality(*newDeal3, changedDeals.Added[0].Deal)) require.Len(t, changedDeals.Removed, 1) require.Len(t, changedDeals.Modified, 1) require.Equal(t, abi.DealID(1), changedDeals.Modified[0].ID) - require.Equal(t, newDeal1, changedDeals.Modified[0].To) - require.Equal(t, oldDeal1, changedDeals.Modified[0].From) + require.True(t, dealEquality(*newDeal1, changedDeals.Modified[0].To)) + require.True(t, dealEquality(*oldDeal1, changedDeals.Modified[0].From)) require.Equal(t, abi.DealID(2), changedDeals.Removed[0].ID) }) @@ -279,17 +288,15 @@ func TestMarketPredicates(t *testing.T) { require.NoError(t, err) require.True(t, changed) - changedProps, ok := valArr.(*MarketDealProposalChanges) + changedProps, ok := valArr.(*market.DealProposalChanges) require.True(t, ok) require.Len(t, changedProps.Added, 1) require.Equal(t, abi.DealID(3), changedProps.Added[0].ID) - require.Equal(t, *newProp3, changedProps.Added[0].Proposal) // proposals cannot be modified -- no modified testing require.Len(t, changedProps.Removed, 1) require.Equal(t, abi.DealID(2), changedProps.Removed[0].ID) - require.Equal(t, *oldProp2, changedProps.Removed[0].Proposal) }) t.Run("balances predicate", func(t *testing.T) { @@ -342,7 +349,13 @@ func TestMarketPredicates(t *testing.T) { t.Fatal("No state change so this should not be called") return false, nil, nil }) - marketState := createEmptyMarketState(t, store) + v0marketState := createEmptyMarketState(t, store) + marketCid, err := store.Put(ctx, v0marketState) + require.NoError(t, err) + marketState, err := market.Load(store, &types.Actor{ + Code: v0builtin.StorageMarketActorCodeID, + Head: marketCid, + }) changed, _, err = diffDealBalancesFn(ctx, marketState, marketState) require.NoError(t, err) require.False(t, changed) @@ -450,7 +463,7 @@ type balance struct { locked abi.TokenAmount } -func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState, props map[abi.DealID]*market.DealProposal, balances map[address.Address]balance) cid.Cid { +func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*v0market.DealState, props map[abi.DealID]*v0market.DealProposal, balances map[address.Address]balance) cid.Cid { dealRootCid := createDealAMT(ctx, t, store, deals) propRootCid := createProposalAMT(ctx, t, store, props) balancesCids := createBalanceTable(ctx, t, store, balances) @@ -465,15 +478,15 @@ func createMarketState(ctx context.Context, t *testing.T, store adt.Store, deals return stateC } -func createEmptyMarketState(t *testing.T, store adt.Store) *market.State { +func createEmptyMarketState(t *testing.T, store adt.Store) *v0market.State { emptyArrayCid, err := adt.MakeEmptyArray(store).Root() require.NoError(t, err) emptyMap, err := adt.MakeEmptyMap(store).Root() require.NoError(t, err) - return market.ConstructState(emptyArrayCid, emptyMap, emptyMap) + return v0market.ConstructState(emptyArrayCid, emptyMap, emptyMap) } -func createDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*market.DealState) cid.Cid { +func createDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map[abi.DealID]*v0market.DealState) cid.Cid { root := adt.MakeEmptyArray(store) for dealID, dealState := range deals { err := root.Set(uint64(dealID), dealState) @@ -484,7 +497,7 @@ func createDealAMT(ctx context.Context, t *testing.T, store adt.Store, deals map return rootCid } -func createProposalAMT(ctx context.Context, t *testing.T, store adt.Store, props map[abi.DealID]*market.DealProposal) cid.Cid { +func createProposalAMT(ctx context.Context, t *testing.T, store adt.Store, props map[abi.DealID]*v0market.DealProposal) cid.Cid { root := adt.MakeEmptyArray(store) for dealID, prop := range props { err := root.Set(uint64(dealID), prop) @@ -607,3 +620,9 @@ func newSectorPreCommitInfo(sectorNo abi.SectorNumber, sealed cid.Cid, expiratio Expiration: expiration, } } + +func dealEquality(expected v0market.DealState, actual market.DealState) bool { + return expected.LastUpdatedEpoch == actual.LastUpdatedEpoch() && + expected.SectorStartEpoch == actual.SectorStartEpoch() && + expected.SlashEpoch == actual.SlashEpoch() +} diff --git a/cmd/lotus-chainwatch/processor/market.go b/cmd/lotus-chainwatch/processor/market.go index e50ec3076ec..a4bae4b2026 100644 --- a/cmd/lotus-chainwatch/processor/market.go +++ b/cmd/lotus-chainwatch/processor/market.go @@ -8,6 +8,7 @@ import ( "golang.org/x/sync/errgroup" "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/events/state" ) @@ -293,14 +294,14 @@ func (p *Processor) updateMarketActorDealProposals(ctx context.Context, marketTi if !changed { continue } - changes, ok := val.(*state.MarketDealStateChanges) + changes, ok := val.(*market.DealStateChanges) if !ok { return xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) } for _, modified := range changes.Modified { - if modified.From.SlashEpoch != modified.To.SlashEpoch { - if _, err := stmt.Exec(modified.To.SlashEpoch, modified.ID); err != nil { + if modified.From.SlashEpoch() != modified.To.SlashEpoch() { + if _, err := stmt.Exec(modified.To.SlashEpoch(), modified.ID); err != nil { return err } }