diff --git a/chain/actors/builtin/market/actor.go.template b/chain/actors/builtin/market/actor.go.template index 90ba85a2..396615ef 100644 --- a/chain/actors/builtin/market/actor.go.template +++ b/chain/actors/builtin/market/actor.go.template @@ -75,6 +75,8 @@ type State interface { DealProposalsAmtBitwidth() int DealStatesAmtBitwidth() int + + GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) } type BalanceTable interface { diff --git a/chain/actors/builtin/market/market.go b/chain/actors/builtin/market/market.go index d8ef085d..02e93b17 100644 --- a/chain/actors/builtin/market/market.go +++ b/chain/actors/builtin/market/market.go @@ -105,6 +105,8 @@ type State interface { DealProposalsAmtBitwidth() int DealStatesAmtBitwidth() int + + GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) } type BalanceTable interface { diff --git a/chain/actors/builtin/market/state.go.template b/chain/actors/builtin/market/state.go.template index b5436851..3e1f70f0 100644 --- a/chain/actors/builtin/market/state.go.template +++ b/chain/actors/builtin/market/state.go.template @@ -306,4 +306,44 @@ func (s *state{{.v}}) Code() cid.Cid { } return markettypes.NewLabelFromBytes(bs) } -{{end}} \ No newline at end of file +{{end}} + +func (s *state{{.v}}) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + {{if (le .v 12)}} + return nil, nil + {{else}} + sectorDeals, err := adt{{.v}}.AsMap(s.store, s.State.ProviderSectors, market{{.v}}.ProviderSectorsHamtBitwidth) + if err != nil { + return nil, err + } + var sectorMapRoot cbg.CborCid + providerSectors := make(map[abi.SectorID][]abi.DealID) + err = sectorDeals.ForEach(§orMapRoot, func(providerID string) error { + provider, err := abi.ParseUIntKey(providerID) + if err != nil { + return nil + } + + sectorMap, err := adt{{.v}}.AsMap(s.store, cid.Cid(sectorMapRoot), market{{.v}}.ProviderSectorsHamtBitwidth) + if err != nil { + return err + } + + var dealIDs market{{.v}}.SectorDealIDs + err = sectorMap.ForEach(&dealIDs, func(sectorID string) error { + sectorNumber, err := abi.ParseUIntKey(sectorID) + if err != nil { + return err + } + + dealIDsCopy := make([]abi.DealID, len(dealIDs)) + copy(dealIDsCopy, dealIDs) + + providerSectors[abi.SectorID{Miner: abi.ActorID(provider), Number: abi.SectorNumber(sectorNumber)}] = dealIDsCopy + return nil + }) + return err + }) + return providerSectors, err + {{end}} + } \ No newline at end of file diff --git a/chain/actors/builtin/market/v0.go b/chain/actors/builtin/market/v0.go index b51bbe81..73605e68 100644 --- a/chain/actors/builtin/market/v0.go +++ b/chain/actors/builtin/market/v0.go @@ -269,3 +269,9 @@ func (s *state0) Code() cid.Cid { return code } + +func (s *state0) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v10.go b/chain/actors/builtin/market/v10.go index 520a7464..60b1b67c 100644 --- a/chain/actors/builtin/market/v10.go +++ b/chain/actors/builtin/market/v10.go @@ -281,3 +281,9 @@ func fromV10Label(v10 market10.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state10) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v11.go b/chain/actors/builtin/market/v11.go index a868a062..dddb7d35 100644 --- a/chain/actors/builtin/market/v11.go +++ b/chain/actors/builtin/market/v11.go @@ -281,3 +281,9 @@ func fromV11Label(v11 market11.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state11) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v12.go b/chain/actors/builtin/market/v12.go index 34d47ea4..0447f4f7 100644 --- a/chain/actors/builtin/market/v12.go +++ b/chain/actors/builtin/market/v12.go @@ -281,3 +281,9 @@ func fromV12Label(v12 market12.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state12) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v13.go b/chain/actors/builtin/market/v13.go index 7b437447..c76ec1c0 100644 --- a/chain/actors/builtin/market/v13.go +++ b/chain/actors/builtin/market/v13.go @@ -281,3 +281,41 @@ func fromV13Label(v13 market13.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state13) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + sectorDeals, err := adt13.AsMap(s.store, s.State.ProviderSectors, market13.ProviderSectorsHamtBitwidth) + if err != nil { + return nil, err + } + var sectorMapRoot cbg.CborCid + providerSectors := make(map[abi.SectorID][]abi.DealID) + err = sectorDeals.ForEach(§orMapRoot, func(providerID string) error { + provider, err := abi.ParseUIntKey(providerID) + if err != nil { + return nil + } + + sectorMap, err := adt13.AsMap(s.store, cid.Cid(sectorMapRoot), market13.ProviderSectorsHamtBitwidth) + if err != nil { + return err + } + + var dealIDs market13.SectorDealIDs + err = sectorMap.ForEach(&dealIDs, func(sectorID string) error { + sectorNumber, err := abi.ParseUIntKey(sectorID) + if err != nil { + return err + } + + dealIDsCopy := make([]abi.DealID, len(dealIDs)) + copy(dealIDsCopy, dealIDs) + + providerSectors[abi.SectorID{Miner: abi.ActorID(provider), Number: abi.SectorNumber(sectorNumber)}] = dealIDsCopy + return nil + }) + return err + }) + return providerSectors, err + +} diff --git a/chain/actors/builtin/market/v2.go b/chain/actors/builtin/market/v2.go index fa848fe7..a0a9e9d7 100644 --- a/chain/actors/builtin/market/v2.go +++ b/chain/actors/builtin/market/v2.go @@ -269,3 +269,9 @@ func (s *state2) Code() cid.Cid { return code } + +func (s *state2) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v3.go b/chain/actors/builtin/market/v3.go index fc1b236b..ab217c3a 100644 --- a/chain/actors/builtin/market/v3.go +++ b/chain/actors/builtin/market/v3.go @@ -264,3 +264,9 @@ func (s *state3) Code() cid.Cid { return code } + +func (s *state3) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v4.go b/chain/actors/builtin/market/v4.go index fd229560..afb77f7f 100644 --- a/chain/actors/builtin/market/v4.go +++ b/chain/actors/builtin/market/v4.go @@ -264,3 +264,9 @@ func (s *state4) Code() cid.Cid { return code } + +func (s *state4) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v5.go b/chain/actors/builtin/market/v5.go index 317e38fe..9da7c2ce 100644 --- a/chain/actors/builtin/market/v5.go +++ b/chain/actors/builtin/market/v5.go @@ -264,3 +264,9 @@ func (s *state5) Code() cid.Cid { return code } + +func (s *state5) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v6.go b/chain/actors/builtin/market/v6.go index a635e2b1..42cc317b 100644 --- a/chain/actors/builtin/market/v6.go +++ b/chain/actors/builtin/market/v6.go @@ -264,3 +264,9 @@ func (s *state6) Code() cid.Cid { return code } + +func (s *state6) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v7.go b/chain/actors/builtin/market/v7.go index 1bca6871..57840b0d 100644 --- a/chain/actors/builtin/market/v7.go +++ b/chain/actors/builtin/market/v7.go @@ -264,3 +264,9 @@ func (s *state7) Code() cid.Cid { return code } + +func (s *state7) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v8.go b/chain/actors/builtin/market/v8.go index c91a1201..c356368d 100644 --- a/chain/actors/builtin/market/v8.go +++ b/chain/actors/builtin/market/v8.go @@ -281,3 +281,9 @@ func fromV8Label(v8 market8.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state8) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v9.go b/chain/actors/builtin/market/v9.go index 80d93ca1..cd7b8766 100644 --- a/chain/actors/builtin/market/v9.go +++ b/chain/actors/builtin/market/v9.go @@ -281,3 +281,9 @@ func fromV9Label(v9 market9.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state9) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index f7b38794..21bd0924 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -695,6 +695,11 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces marketactors.AllCodes(), markettask.DealProposalExtractor{}, )) + case tasktype.MinerSectorDealV2: + out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap( + marketactors.AllCodes(), + markettask.SectorDealStateExtractor{}, + )) // // Multisig diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index 0654cb32..9e955b6f 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -50,7 +50,7 @@ func TestNewProcessor(t *testing.T) { proc, err := New(nil, t.Name(), tasktype.AllTableTasks) require.NoError(t, err) require.Equal(t, t.Name(), proc.name) - require.Len(t, proc.actorProcessors, 25) + require.Len(t, proc.actorProcessors, 26) require.Len(t, proc.tipsetProcessors, 10) require.Len(t, proc.tipsetsProcessors, 15) require.Len(t, proc.builtinProcessors, 1) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index 78aa6204..84eec88b 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -439,7 +439,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) { // If this test fails it indicates a new processor and/or task name was added and test should be created for it in one of the above test cases. proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName)) require.NoError(t, err) - require.Len(t, proc.ActorProcessors, 25) + require.Len(t, proc.ActorProcessors, 26) require.Len(t, proc.TipsetProcessors, 10) require.Len(t, proc.TipsetsProcessors, 15) require.Len(t, proc.ReportProcessors, 1) diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index f5b39e3d..f0d0d796 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -8,6 +8,7 @@ const ( DataCapBalance = "data_cap_balance" MinerBeneficiary = "miner_beneficiary" MinerSectorDeal = "miner_sector_deal" + MinerSectorDealV2 = "miner_sector_deal_v2" MinerSectorInfoV7 = "miner_sector_infos_v7" MinerSectorInfoV1_6 = "miner_sector_infos" MinerSectorPost = "miner_sector_post" @@ -109,6 +110,7 @@ var AllTableTasks = []string{ FEVMActorDump, MinerActorDump, BuiltInActorEvent, + MinerSectorDealV2, } var TableLookup = map[string]struct{}{ @@ -164,6 +166,7 @@ var TableLookup = map[string]struct{}{ FEVMActorDump: {}, MinerActorDump: {}, BuiltInActorEvent: {}, + MinerSectorDealV2: {}, } var TableComment = map[string]string{ @@ -219,6 +222,7 @@ var TableComment = map[string]string{ FEVMActorDump: ``, MinerActorDump: ``, BuiltInActorEvent: ``, + MinerSectorDealV2: ``, } var TableFieldComments = map[string]map[string]string{ @@ -447,4 +451,5 @@ var TableFieldComments = map[string]map[string]string{ "TotalLockedFunds": "Locked Funds", }, BuiltInActorEvent: {}, + MinerSectorDealV2: {}, } diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index 45081cbe..4a76a363 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -53,6 +53,7 @@ var TaskLookup = map[string][]string{ ActorStatesMarketTask: { MarketDealProposal, MarketDealState, + MinerSectorDealV2, }, ActorStatesMultisigTask: { MultisigTransaction, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index aa1a8a7a..78f95245 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -39,7 +39,7 @@ func TestMakeTaskNamesAlias(t *testing.T) { }, { taskAlias: tasktype.ActorStatesMarketTask, - tasks: []string{tasktype.MarketDealProposal, tasktype.MarketDealState}, + tasks: []string{tasktype.MarketDealProposal, tasktype.MarketDealState, tasktype.MinerSectorDealV2}, }, { taskAlias: tasktype.ActorStatesMultisigTask, @@ -102,7 +102,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) { } func TestMakeAllTaskNames(t *testing.T) { - const TotalTableTasks = 52 + const TotalTableTasks = 53 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/model/actors/miner/sectordealsV2.go b/model/actors/miner/sectordealsV2.go new file mode 100644 index 00000000..90fcc78c --- /dev/null +++ b/model/actors/miner/sectordealsV2.go @@ -0,0 +1,45 @@ +package miner + +import ( + "context" + + "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + + "github.com/filecoin-project/lily/metrics" + "github.com/filecoin-project/lily/model" +) + +type MinerSectorDealV2 struct { + tableName struct{} `pg:"miner_sector_deals_v2"` // nolint: structcheck + + Height int64 `pg:",pk,notnull,use_zero"` + MinerID string `pg:",pk,notnull"` + SectorID uint64 `pg:",pk,use_zero"` + DealID uint64 `pg:",pk,use_zero"` +} + +func (ds *MinerSectorDealV2) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals_v2")) + metrics.RecordCount(ctx, metrics.PersistModel, 1) + return s.PersistModel(ctx, ds) +} + +type MinerSectorDealListV2 []*MinerSectorDealV2 + +func (ml MinerSectorDealListV2) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, span := otel.Tracer("").Start(ctx, "MinerSectorDealListV2.Persist") + if span.IsRecording() { + span.SetAttributes(attribute.Int("count", len(ml))) + } + defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals_v2")) + + if len(ml) == 0 { + return nil + } + metrics.RecordCount(ctx, metrics.PersistModel, len(ml)) + return s.PersistModel(ctx, ml) +} diff --git a/schemas/v1/38_miner_sector_info_v2.go b/schemas/v1/38_miner_sector_info_v2.go new file mode 100644 index 00000000..76fe6481 --- /dev/null +++ b/schemas/v1/38_miner_sector_info_v2.go @@ -0,0 +1,19 @@ +package v1 + +func init() { + patches.Register( + 38, + ` + CREATE TABLE {{ .SchemaName | default "public"}}.miner_sector_deals_v2 ( + miner_id text NOT NULL, + sector_id bigint NOT NULL, + deal_id bigint NOT NULL, + height bigint NOT NULL + ); + ALTER TABLE ONLY {{ .SchemaName | default "public"}}.miner_sector_deals_v2 ADD CONSTRAINT miner_sector_deals_v2_pkey PRIMARY KEY (height, miner_id, sector_id, deal_id); + CREATE INDEX IF NOT EXISTS miner_sector_deals_height_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING btree (height DESC); + CREATE INDEX IF NOT EXISTS miner_sector_deals_miner_id_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING hash (miner_id); + CREATE INDEX IF NOT EXISTS miner_sector_deals_sector_id_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING hash (sector_id); +`, + ) +} diff --git a/storage/sql.go b/storage/sql.go index f99354d0..6f0823cb 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -58,6 +58,7 @@ var Models = []interface{}{ (*miner.MinerFeeDebt)(nil), (*miner.MinerLockedFund)(nil), (*miner.MinerInfo)(nil), + (*miner.MinerSectorDealV2)(nil), (*market.MarketDealProposal)(nil), (*market.MarketDealState)(nil), diff --git a/tasks/actorstate/market/sector_deals_v2.go b/tasks/actorstate/market/sector_deals_v2.go new file mode 100644 index 00000000..fca949eb --- /dev/null +++ b/tasks/actorstate/market/sector_deals_v2.go @@ -0,0 +1,108 @@ +package market + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.uber.org/zap" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lily/chain/actors/builtin/market" + "github.com/filecoin-project/lily/model" + marketmodel "github.com/filecoin-project/lily/model/actors/market" + "github.com/filecoin-project/lily/model/actors/miner" + "github.com/filecoin-project/lily/tasks/actorstate" +) + +var _ actorstate.ActorStateExtractor = (*SectorDealStateExtractor)(nil) + +type SectorDealStateExtractor struct{} + +func (SectorDealStateExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, node actorstate.ActorStateAPI) (model.Persistable, error) { + log.Debugw("extract", zap.String("extractor", "SectorDealStateExtractor"), zap.Inline(a)) + ctx, span := otel.Tracer("").Start(ctx, "SectorDealStateExtractor.Extract") + defer span.End() + if span.IsRecording() { + span.SetAttributes(a.Attributes()...) + } + + ec, err := NewMarketStateExtractionContext(ctx, a, node) + if err != nil { + return nil, err + } + + currDealStates, err := ec.CurrState.States() + if err != nil { + return nil, fmt.Errorf("loading current market deal states: %w", err) + } + + if ec.IsGenesis() { + var out marketmodel.MarketDealStates + if err := currDealStates.ForEach(func(id abi.DealID, ds market.DealState) error { + out = append(out, &marketmodel.MarketDealState{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(id), + SectorStartEpoch: int64(ds.SectorStartEpoch()), + LastUpdateEpoch: int64(ds.LastUpdatedEpoch()), + SlashEpoch: int64(ds.SlashEpoch()), + StateRoot: ec.CurrTs.ParentState().String(), + }) + return nil + }); err != nil { + return nil, fmt.Errorf("walking current deal states: %w", err) + } + return out, nil + } + + changed, err := ec.CurrState.StatesChanged(ec.PrevState) + if err != nil { + return nil, fmt.Errorf("checking for deal state changes: %w", err) + } + + if !changed { + return nil, nil + } + + result, err := ec.CurrState.GetProviderSectors() + if err != nil { + log.Errorf("Get the errors during getting provider sectors: %v", err) + return nil, nil + } + dealSectorMap := make(map[abi.DealID]abi.SectorID) + for sectorID, dealIDs := range result { + for _, dealID := range dealIDs { + dealSectorMap[dealID] = sectorID + } + } + + changes, err := market.DiffDealStates(ctx, ec.Store, ec.PrevState, ec.CurrState) + if err != nil { + return nil, fmt.Errorf("diffing deal states: %w", err) + } + + out := make(miner.MinerSectorDealListV2, 0) + for _, add := range changes.Added { + sector, exists := dealSectorMap[add.ID] + if exists { + out = append(out, &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(add.ID), + SectorID: uint64(dealSectorMap[add.ID].Number), + MinerID: sector.Miner.String(), + }) + } + } + for _, mod := range changes.Modified { + sector, exists := dealSectorMap[mod.ID] + if exists { + out = append(out, &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(mod.ID), + SectorID: uint64(dealSectorMap[mod.ID].Number), + MinerID: sector.Miner.String(), + }) + } + } + return out, nil +} diff --git a/tasks/messages/builtinactorevent/task.go b/tasks/messages/builtinactorevent/task.go index 66e22a4d..0191fda7 100644 --- a/tasks/messages/builtinactorevent/task.go +++ b/tasks/messages/builtinactorevent/task.go @@ -2,17 +2,11 @@ package builtinactorevent import ( "context" - "fmt" - "strconv" - b64 "encoding/base64" "encoding/json" + "fmt" + "strconv" - "github.com/filecoin-project/lily/model" - "github.com/filecoin-project/lily/model/actors/builtinactor" - visormodel "github.com/filecoin-project/lily/model/visor" - "github.com/filecoin-project/lily/tasks" - "github.com/filecoin-project/lotus/chain/types" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -21,6 +15,13 @@ import ( "github.com/ipld/go-ipld-prime/node/bindnode" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + + "github.com/filecoin-project/lily/model" + "github.com/filecoin-project/lily/model/actors/builtinactor" + visormodel "github.com/filecoin-project/lily/model/visor" + "github.com/filecoin-project/lily/tasks" + + "github.com/filecoin-project/lotus/chain/types" ) var log = logging.Logger("lily/tasks/builtinactorevent")