From 0d0da1915917db05dce4390db5cfe2fb5124e967 Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 17:20:58 +0800 Subject: [PATCH 1/6] Add new task: miner_sector_deal_v2 --- chain/actors/builtin/market/actor.go.template | 2 + chain/actors/builtin/market/market.go | 2 + chain/actors/builtin/market/state.go.template | 42 ++++++- chain/actors/builtin/market/v0.go | 6 + chain/actors/builtin/market/v10.go | 6 + chain/actors/builtin/market/v11.go | 6 + chain/actors/builtin/market/v12.go | 6 + chain/actors/builtin/market/v13.go | 38 +++++++ chain/actors/builtin/market/v2.go | 6 + chain/actors/builtin/market/v3.go | 6 + chain/actors/builtin/market/v4.go | 6 + chain/actors/builtin/market/v5.go | 6 + chain/actors/builtin/market/v6.go | 6 + chain/actors/builtin/market/v7.go | 6 + chain/actors/builtin/market/v8.go | 6 + chain/actors/builtin/market/v9.go | 6 + chain/indexer/integrated/processor/state.go | 5 + chain/indexer/tasktype/table_tasks.go | 5 + chain/indexer/tasktype/tasks.go | 1 + chain/indexer/tasktype/tasks_test.go | 2 +- model/actors/miner/sectordealsV2.go | 45 ++++++++ tasks/actorstate/market/sector_deals_v2.go | 104 ++++++++++++++++++ tasks/messages/builtinactorevent/task.go | 17 +-- 23 files changed, 325 insertions(+), 10 deletions(-) create mode 100644 model/actors/miner/sectordealsV2.go create mode 100644 tasks/actorstate/market/sector_deals_v2.go diff --git a/chain/actors/builtin/market/actor.go.template b/chain/actors/builtin/market/actor.go.template index 90ba85a2e..396615efe 100644 --- a/chain/actors/builtin/market/actor.go.template +++ b/chain/actors/builtin/market/actor.go.template @@ -75,6 +75,8 @@ type State interface { DealProposalsAmtBitwidth() int DealStatesAmtBitwidth() int + + GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) } type BalanceTable interface { diff --git a/chain/actors/builtin/market/market.go b/chain/actors/builtin/market/market.go index d8ef085d8..02e93b17c 100644 --- a/chain/actors/builtin/market/market.go +++ b/chain/actors/builtin/market/market.go @@ -105,6 +105,8 @@ type State interface { DealProposalsAmtBitwidth() int DealStatesAmtBitwidth() int + + GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) } type BalanceTable interface { diff --git a/chain/actors/builtin/market/state.go.template b/chain/actors/builtin/market/state.go.template index b54368515..3e1f70f06 100644 --- a/chain/actors/builtin/market/state.go.template +++ b/chain/actors/builtin/market/state.go.template @@ -306,4 +306,44 @@ func (s *state{{.v}}) Code() cid.Cid { } return markettypes.NewLabelFromBytes(bs) } -{{end}} \ No newline at end of file +{{end}} + +func (s *state{{.v}}) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + {{if (le .v 12)}} + return nil, nil + {{else}} + sectorDeals, err := adt{{.v}}.AsMap(s.store, s.State.ProviderSectors, market{{.v}}.ProviderSectorsHamtBitwidth) + if err != nil { + return nil, err + } + var sectorMapRoot cbg.CborCid + providerSectors := make(map[abi.SectorID][]abi.DealID) + err = sectorDeals.ForEach(§orMapRoot, func(providerID string) error { + provider, err := abi.ParseUIntKey(providerID) + if err != nil { + return nil + } + + sectorMap, err := adt{{.v}}.AsMap(s.store, cid.Cid(sectorMapRoot), market{{.v}}.ProviderSectorsHamtBitwidth) + if err != nil { + return err + } + + var dealIDs market{{.v}}.SectorDealIDs + err = sectorMap.ForEach(&dealIDs, func(sectorID string) error { + sectorNumber, err := abi.ParseUIntKey(sectorID) + if err != nil { + return err + } + + dealIDsCopy := make([]abi.DealID, len(dealIDs)) + copy(dealIDsCopy, dealIDs) + + providerSectors[abi.SectorID{Miner: abi.ActorID(provider), Number: abi.SectorNumber(sectorNumber)}] = dealIDsCopy + return nil + }) + return err + }) + return providerSectors, err + {{end}} + } \ No newline at end of file diff --git a/chain/actors/builtin/market/v0.go b/chain/actors/builtin/market/v0.go index b51bbe819..73605e681 100644 --- a/chain/actors/builtin/market/v0.go +++ b/chain/actors/builtin/market/v0.go @@ -269,3 +269,9 @@ func (s *state0) Code() cid.Cid { return code } + +func (s *state0) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v10.go b/chain/actors/builtin/market/v10.go index 520a74649..60b1b67cc 100644 --- a/chain/actors/builtin/market/v10.go +++ b/chain/actors/builtin/market/v10.go @@ -281,3 +281,9 @@ func fromV10Label(v10 market10.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state10) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v11.go b/chain/actors/builtin/market/v11.go index a868a0627..dddb7d35b 100644 --- a/chain/actors/builtin/market/v11.go +++ b/chain/actors/builtin/market/v11.go @@ -281,3 +281,9 @@ func fromV11Label(v11 market11.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state11) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v12.go b/chain/actors/builtin/market/v12.go index 34d47ea47..0447f4f74 100644 --- a/chain/actors/builtin/market/v12.go +++ b/chain/actors/builtin/market/v12.go @@ -281,3 +281,9 @@ func fromV12Label(v12 market12.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state12) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v13.go b/chain/actors/builtin/market/v13.go index 7b437447e..c76ec1c0c 100644 --- a/chain/actors/builtin/market/v13.go +++ b/chain/actors/builtin/market/v13.go @@ -281,3 +281,41 @@ func fromV13Label(v13 market13.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state13) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + sectorDeals, err := adt13.AsMap(s.store, s.State.ProviderSectors, market13.ProviderSectorsHamtBitwidth) + if err != nil { + return nil, err + } + var sectorMapRoot cbg.CborCid + providerSectors := make(map[abi.SectorID][]abi.DealID) + err = sectorDeals.ForEach(§orMapRoot, func(providerID string) error { + provider, err := abi.ParseUIntKey(providerID) + if err != nil { + return nil + } + + sectorMap, err := adt13.AsMap(s.store, cid.Cid(sectorMapRoot), market13.ProviderSectorsHamtBitwidth) + if err != nil { + return err + } + + var dealIDs market13.SectorDealIDs + err = sectorMap.ForEach(&dealIDs, func(sectorID string) error { + sectorNumber, err := abi.ParseUIntKey(sectorID) + if err != nil { + return err + } + + dealIDsCopy := make([]abi.DealID, len(dealIDs)) + copy(dealIDsCopy, dealIDs) + + providerSectors[abi.SectorID{Miner: abi.ActorID(provider), Number: abi.SectorNumber(sectorNumber)}] = dealIDsCopy + return nil + }) + return err + }) + return providerSectors, err + +} diff --git a/chain/actors/builtin/market/v2.go b/chain/actors/builtin/market/v2.go index fa848fe73..a0a9e9d7b 100644 --- a/chain/actors/builtin/market/v2.go +++ b/chain/actors/builtin/market/v2.go @@ -269,3 +269,9 @@ func (s *state2) Code() cid.Cid { return code } + +func (s *state2) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v3.go b/chain/actors/builtin/market/v3.go index fc1b236bb..ab217c3af 100644 --- a/chain/actors/builtin/market/v3.go +++ b/chain/actors/builtin/market/v3.go @@ -264,3 +264,9 @@ func (s *state3) Code() cid.Cid { return code } + +func (s *state3) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v4.go b/chain/actors/builtin/market/v4.go index fd2295607..afb77f7fc 100644 --- a/chain/actors/builtin/market/v4.go +++ b/chain/actors/builtin/market/v4.go @@ -264,3 +264,9 @@ func (s *state4) Code() cid.Cid { return code } + +func (s *state4) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v5.go b/chain/actors/builtin/market/v5.go index 317e38fec..9da7c2ced 100644 --- a/chain/actors/builtin/market/v5.go +++ b/chain/actors/builtin/market/v5.go @@ -264,3 +264,9 @@ func (s *state5) Code() cid.Cid { return code } + +func (s *state5) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v6.go b/chain/actors/builtin/market/v6.go index a635e2b17..42cc317be 100644 --- a/chain/actors/builtin/market/v6.go +++ b/chain/actors/builtin/market/v6.go @@ -264,3 +264,9 @@ func (s *state6) Code() cid.Cid { return code } + +func (s *state6) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v7.go b/chain/actors/builtin/market/v7.go index 1bca6871a..57840b0d1 100644 --- a/chain/actors/builtin/market/v7.go +++ b/chain/actors/builtin/market/v7.go @@ -264,3 +264,9 @@ func (s *state7) Code() cid.Cid { return code } + +func (s *state7) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v8.go b/chain/actors/builtin/market/v8.go index c91a12018..c356368df 100644 --- a/chain/actors/builtin/market/v8.go +++ b/chain/actors/builtin/market/v8.go @@ -281,3 +281,9 @@ func fromV8Label(v8 market8.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state8) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/actors/builtin/market/v9.go b/chain/actors/builtin/market/v9.go index 80d93ca14..cd7b87661 100644 --- a/chain/actors/builtin/market/v9.go +++ b/chain/actors/builtin/market/v9.go @@ -281,3 +281,9 @@ func fromV9Label(v9 market9.DealLabel) (DealLabel, error) { } return markettypes.NewLabelFromBytes(bs) } + +func (s *state9) GetProviderSectors() (map[abi.SectorID][]abi.DealID, error) { + + return nil, nil + +} diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index f7b38794a..21bd0924e 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -695,6 +695,11 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces marketactors.AllCodes(), markettask.DealProposalExtractor{}, )) + case tasktype.MinerSectorDealV2: + out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap( + marketactors.AllCodes(), + markettask.SectorDealStateExtractor{}, + )) // // Multisig diff --git a/chain/indexer/tasktype/table_tasks.go b/chain/indexer/tasktype/table_tasks.go index f5b39e3d4..f0d0d7961 100644 --- a/chain/indexer/tasktype/table_tasks.go +++ b/chain/indexer/tasktype/table_tasks.go @@ -8,6 +8,7 @@ const ( DataCapBalance = "data_cap_balance" MinerBeneficiary = "miner_beneficiary" MinerSectorDeal = "miner_sector_deal" + MinerSectorDealV2 = "miner_sector_deal_v2" MinerSectorInfoV7 = "miner_sector_infos_v7" MinerSectorInfoV1_6 = "miner_sector_infos" MinerSectorPost = "miner_sector_post" @@ -109,6 +110,7 @@ var AllTableTasks = []string{ FEVMActorDump, MinerActorDump, BuiltInActorEvent, + MinerSectorDealV2, } var TableLookup = map[string]struct{}{ @@ -164,6 +166,7 @@ var TableLookup = map[string]struct{}{ FEVMActorDump: {}, MinerActorDump: {}, BuiltInActorEvent: {}, + MinerSectorDealV2: {}, } var TableComment = map[string]string{ @@ -219,6 +222,7 @@ var TableComment = map[string]string{ FEVMActorDump: ``, MinerActorDump: ``, BuiltInActorEvent: ``, + MinerSectorDealV2: ``, } var TableFieldComments = map[string]map[string]string{ @@ -447,4 +451,5 @@ var TableFieldComments = map[string]map[string]string{ "TotalLockedFunds": "Locked Funds", }, BuiltInActorEvent: {}, + MinerSectorDealV2: {}, } diff --git a/chain/indexer/tasktype/tasks.go b/chain/indexer/tasktype/tasks.go index 45081cbe2..4a76a363e 100644 --- a/chain/indexer/tasktype/tasks.go +++ b/chain/indexer/tasktype/tasks.go @@ -53,6 +53,7 @@ var TaskLookup = map[string][]string{ ActorStatesMarketTask: { MarketDealProposal, MarketDealState, + MinerSectorDealV2, }, ActorStatesMultisigTask: { MultisigTransaction, diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index aa1a8a7a6..939f103c3 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -102,7 +102,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) { } func TestMakeAllTaskNames(t *testing.T) { - const TotalTableTasks = 52 + const TotalTableTasks = 53 actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks) require.NoError(t, err) // if this test fails it means a new task name was added, update the above test diff --git a/model/actors/miner/sectordealsV2.go b/model/actors/miner/sectordealsV2.go new file mode 100644 index 000000000..6222c34e0 --- /dev/null +++ b/model/actors/miner/sectordealsV2.go @@ -0,0 +1,45 @@ +package miner + +import ( + "context" + + "go.opencensus.io/tag" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + + "github.com/filecoin-project/lily/metrics" + "github.com/filecoin-project/lily/model" +) + +type MinerSectorDealV2 struct { + tableName struct{} `pg:"miner_sector_deal_v2"` // nolint: structcheck + + Height int64 `pg:",pk,notnull,use_zero"` + MinerID string `pg:",pk,notnull"` + SectorID uint64 `pg:",pk,use_zero"` + DealID uint64 `pg:",pk,use_zero"` +} + +func (ds *MinerSectorDealV2) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals_v2")) + metrics.RecordCount(ctx, metrics.PersistModel, 1) + return s.PersistModel(ctx, ds) +} + +type MinerSectorDealListV2 []*MinerSectorDealV2 + +func (ml MinerSectorDealListV2) Persist(ctx context.Context, s model.StorageBatch, _ model.Version) error { + ctx, span := otel.Tracer("").Start(ctx, "MinerSectorDealListV2.Persist") + if span.IsRecording() { + span.SetAttributes(attribute.Int("count", len(ml))) + } + defer span.End() + + ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "miner_sector_deals_v2")) + + if len(ml) == 0 { + return nil + } + metrics.RecordCount(ctx, metrics.PersistModel, len(ml)) + return s.PersistModel(ctx, ml) +} diff --git a/tasks/actorstate/market/sector_deals_v2.go b/tasks/actorstate/market/sector_deals_v2.go new file mode 100644 index 000000000..878a4ad1e --- /dev/null +++ b/tasks/actorstate/market/sector_deals_v2.go @@ -0,0 +1,104 @@ +package market + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.uber.org/zap" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lily/chain/actors/builtin/market" + "github.com/filecoin-project/lily/model" + marketmodel "github.com/filecoin-project/lily/model/actors/market" + "github.com/filecoin-project/lily/model/actors/miner" + "github.com/filecoin-project/lily/tasks/actorstate" + logging "github.com/ipfs/go-log/v2" +) + +var _ actorstate.ActorStateExtractor = (*SectorDealStateExtractor)(nil) + +type SectorDealStateExtractor struct{} + +var logger = logging.Logger("lily/tasks/sector_deal_logger") + +func (SectorDealStateExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, node actorstate.ActorStateAPI) (model.Persistable, error) { + log.Debugw("extract", zap.String("extractor", "SectorDealStateExtractor"), zap.Inline(a)) + ctx, span := otel.Tracer("").Start(ctx, "SectorDealStateExtractor.Extract") + defer span.End() + if span.IsRecording() { + span.SetAttributes(a.Attributes()...) + } + + ec, err := NewMarketStateExtractionContext(ctx, a, node) + if err != nil { + return nil, err + } + + currDealStates, err := ec.CurrState.States() + if err != nil { + return nil, fmt.Errorf("loading current market deal states: %w", err) + } + + if ec.IsGenesis() { + var out marketmodel.MarketDealStates + if err := currDealStates.ForEach(func(id abi.DealID, ds market.DealState) error { + out = append(out, &marketmodel.MarketDealState{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(id), + SectorStartEpoch: int64(ds.SectorStartEpoch()), + LastUpdateEpoch: int64(ds.LastUpdatedEpoch()), + SlashEpoch: int64(ds.SlashEpoch()), + StateRoot: ec.CurrTs.ParentState().String(), + }) + return nil + }); err != nil { + return nil, fmt.Errorf("walking current deal states: %w", err) + } + return out, nil + } + + changed, err := ec.CurrState.StatesChanged(ec.PrevState) + if err != nil { + return nil, fmt.Errorf("checking for deal state changes: %w", err) + } + + if !changed { + return nil, nil + } + + result, err := ec.CurrState.GetProviderSectors() + dealSectorMap := make(map[abi.DealID]abi.SectorID) + for sectorId, dealIDs := range result { + for _, dealID := range dealIDs { + dealSectorMap[dealID] = sectorId + } + } + + changes, err := market.DiffDealStates(ctx, ec.Store, ec.PrevState, ec.CurrState) + if err != nil { + return nil, fmt.Errorf("diffing deal states: %w", err) + } + + out := make(miner.MinerSectorDealListV2, len(changes.Added)+len(changes.Modified)) + idx := 0 + for _, add := range changes.Added { + out[idx] = &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(add.ID), + SectorID: uint64(dealSectorMap[add.ID].Number), + MinerID: dealSectorMap[add.ID].Miner.String(), + } + idx++ + } + for _, mod := range changes.Modified { + out[idx] = &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(mod.ID), + SectorID: uint64(dealSectorMap[mod.ID].Number), + MinerID: dealSectorMap[mod.ID].Miner.String(), + } + idx++ + } + return out, nil +} diff --git a/tasks/messages/builtinactorevent/task.go b/tasks/messages/builtinactorevent/task.go index 66e22a4d5..0191fda7e 100644 --- a/tasks/messages/builtinactorevent/task.go +++ b/tasks/messages/builtinactorevent/task.go @@ -2,17 +2,11 @@ package builtinactorevent import ( "context" - "fmt" - "strconv" - b64 "encoding/base64" "encoding/json" + "fmt" + "strconv" - "github.com/filecoin-project/lily/model" - "github.com/filecoin-project/lily/model/actors/builtinactor" - visormodel "github.com/filecoin-project/lily/model/visor" - "github.com/filecoin-project/lily/tasks" - "github.com/filecoin-project/lotus/chain/types" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" @@ -21,6 +15,13 @@ import ( "github.com/ipld/go-ipld-prime/node/bindnode" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + + "github.com/filecoin-project/lily/model" + "github.com/filecoin-project/lily/model/actors/builtinactor" + visormodel "github.com/filecoin-project/lily/model/visor" + "github.com/filecoin-project/lily/tasks" + + "github.com/filecoin-project/lotus/chain/types" ) var log = logging.Logger("lily/tasks/builtinactorevent") From c27be257321c3f537893911e9af91e3986677266 Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 17:31:18 +0800 Subject: [PATCH 2/6] Add the new schema for miner_sector_info_v2 --- model/actors/miner/sectordealsV2.go | 2 +- schemas/v1/38_miner_sector_info_v2.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) create mode 100644 schemas/v1/38_miner_sector_info_v2.go diff --git a/model/actors/miner/sectordealsV2.go b/model/actors/miner/sectordealsV2.go index 6222c34e0..90fcc78c7 100644 --- a/model/actors/miner/sectordealsV2.go +++ b/model/actors/miner/sectordealsV2.go @@ -12,7 +12,7 @@ import ( ) type MinerSectorDealV2 struct { - tableName struct{} `pg:"miner_sector_deal_v2"` // nolint: structcheck + tableName struct{} `pg:"miner_sector_deals_v2"` // nolint: structcheck Height int64 `pg:",pk,notnull,use_zero"` MinerID string `pg:",pk,notnull"` diff --git a/schemas/v1/38_miner_sector_info_v2.go b/schemas/v1/38_miner_sector_info_v2.go new file mode 100644 index 000000000..76fe64816 --- /dev/null +++ b/schemas/v1/38_miner_sector_info_v2.go @@ -0,0 +1,19 @@ +package v1 + +func init() { + patches.Register( + 38, + ` + CREATE TABLE {{ .SchemaName | default "public"}}.miner_sector_deals_v2 ( + miner_id text NOT NULL, + sector_id bigint NOT NULL, + deal_id bigint NOT NULL, + height bigint NOT NULL + ); + ALTER TABLE ONLY {{ .SchemaName | default "public"}}.miner_sector_deals_v2 ADD CONSTRAINT miner_sector_deals_v2_pkey PRIMARY KEY (height, miner_id, sector_id, deal_id); + CREATE INDEX IF NOT EXISTS miner_sector_deals_height_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING btree (height DESC); + CREATE INDEX IF NOT EXISTS miner_sector_deals_miner_id_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING hash (miner_id); + CREATE INDEX IF NOT EXISTS miner_sector_deals_sector_id_idx ON {{ .SchemaName | default "public"}}.miner_sector_deals_v2 USING hash (sector_id); +`, + ) +} From 71af7752c41e92f2dc6bca9b4f82d737e7973d16 Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 17:50:00 +0800 Subject: [PATCH 3/6] Add more check for getting sector id --- tasks/actorstate/market/sector_deals_v2.go | 31 ++++++++++++---------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/tasks/actorstate/market/sector_deals_v2.go b/tasks/actorstate/market/sector_deals_v2.go index 878a4ad1e..5321b34e3 100644 --- a/tasks/actorstate/market/sector_deals_v2.go +++ b/tasks/actorstate/market/sector_deals_v2.go @@ -80,25 +80,28 @@ func (SectorDealStateExtractor) Extract(ctx context.Context, a actorstate.ActorI return nil, fmt.Errorf("diffing deal states: %w", err) } - out := make(miner.MinerSectorDealListV2, len(changes.Added)+len(changes.Modified)) - idx := 0 + out := make(miner.MinerSectorDealListV2, 0) for _, add := range changes.Added { - out[idx] = &miner.MinerSectorDealV2{ - Height: int64(ec.CurrTs.Height()), - DealID: uint64(add.ID), - SectorID: uint64(dealSectorMap[add.ID].Number), - MinerID: dealSectorMap[add.ID].Miner.String(), + sector, exists := dealSectorMap[add.ID] + if exists { + out = append(out, &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(add.ID), + SectorID: uint64(dealSectorMap[add.ID].Number), + MinerID: sector.Miner.String(), + }) } - idx++ } for _, mod := range changes.Modified { - out[idx] = &miner.MinerSectorDealV2{ - Height: int64(ec.CurrTs.Height()), - DealID: uint64(mod.ID), - SectorID: uint64(dealSectorMap[mod.ID].Number), - MinerID: dealSectorMap[mod.ID].Miner.String(), + sector, exists := dealSectorMap[mod.ID] + if exists { + out = append(out, &miner.MinerSectorDealV2{ + Height: int64(ec.CurrTs.Height()), + DealID: uint64(mod.ID), + SectorID: uint64(dealSectorMap[mod.ID].Number), + MinerID: sector.Miner.String(), + }) } - idx++ } return out, nil } From 0b6ce827d15484b678b01cc922a9e6c1f48d3136 Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 18:00:09 +0800 Subject: [PATCH 4/6] Fix the test and lint --- chain/indexer/integrated/processor/state_internal_test.go | 2 +- chain/indexer/integrated/processor/state_test.go | 2 +- storage/sql.go | 1 + tasks/actorstate/market/sector_deals_v2.go | 3 --- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index 0654cb324..9e955b6f4 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -50,7 +50,7 @@ func TestNewProcessor(t *testing.T) { proc, err := New(nil, t.Name(), tasktype.AllTableTasks) require.NoError(t, err) require.Equal(t, t.Name(), proc.name) - require.Len(t, proc.actorProcessors, 25) + require.Len(t, proc.actorProcessors, 26) require.Len(t, proc.tipsetProcessors, 10) require.Len(t, proc.tipsetsProcessors, 15) require.Len(t, proc.builtinProcessors, 1) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index 78aa62048..84eec88b1 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -439,7 +439,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) { // If this test fails it indicates a new processor and/or task name was added and test should be created for it in one of the above test cases. proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName)) require.NoError(t, err) - require.Len(t, proc.ActorProcessors, 25) + require.Len(t, proc.ActorProcessors, 26) require.Len(t, proc.TipsetProcessors, 10) require.Len(t, proc.TipsetsProcessors, 15) require.Len(t, proc.ReportProcessors, 1) diff --git a/storage/sql.go b/storage/sql.go index f99354d01..6f0823cb3 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -58,6 +58,7 @@ var Models = []interface{}{ (*miner.MinerFeeDebt)(nil), (*miner.MinerLockedFund)(nil), (*miner.MinerInfo)(nil), + (*miner.MinerSectorDealV2)(nil), (*market.MarketDealProposal)(nil), (*market.MarketDealState)(nil), diff --git a/tasks/actorstate/market/sector_deals_v2.go b/tasks/actorstate/market/sector_deals_v2.go index 5321b34e3..25c432f85 100644 --- a/tasks/actorstate/market/sector_deals_v2.go +++ b/tasks/actorstate/market/sector_deals_v2.go @@ -13,15 +13,12 @@ import ( marketmodel "github.com/filecoin-project/lily/model/actors/market" "github.com/filecoin-project/lily/model/actors/miner" "github.com/filecoin-project/lily/tasks/actorstate" - logging "github.com/ipfs/go-log/v2" ) var _ actorstate.ActorStateExtractor = (*SectorDealStateExtractor)(nil) type SectorDealStateExtractor struct{} -var logger = logging.Logger("lily/tasks/sector_deal_logger") - func (SectorDealStateExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, node actorstate.ActorStateAPI) (model.Persistable, error) { log.Debugw("extract", zap.String("extractor", "SectorDealStateExtractor"), zap.Inline(a)) ctx, span := otel.Tracer("").Start(ctx, "SectorDealStateExtractor.Extract") From 69bfee0c527137ef3e940b93d4dc4eba1c929a8f Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 18:05:13 +0800 Subject: [PATCH 5/6] Fix test --- chain/indexer/tasktype/tasks_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/indexer/tasktype/tasks_test.go b/chain/indexer/tasktype/tasks_test.go index 939f103c3..78f95245e 100644 --- a/chain/indexer/tasktype/tasks_test.go +++ b/chain/indexer/tasktype/tasks_test.go @@ -39,7 +39,7 @@ func TestMakeTaskNamesAlias(t *testing.T) { }, { taskAlias: tasktype.ActorStatesMarketTask, - tasks: []string{tasktype.MarketDealProposal, tasktype.MarketDealState}, + tasks: []string{tasktype.MarketDealProposal, tasktype.MarketDealState, tasktype.MinerSectorDealV2}, }, { taskAlias: tasktype.ActorStatesMultisigTask, From 0e12315b31fcee0e99d4c46ad62dbeee7ed3398f Mon Sep 17 00:00:00 2001 From: "terry.hung" Date: Thu, 25 Apr 2024 19:46:59 +0800 Subject: [PATCH 6/6] Add more error log and refine the variable name --- tasks/actorstate/market/sector_deals_v2.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tasks/actorstate/market/sector_deals_v2.go b/tasks/actorstate/market/sector_deals_v2.go index 25c432f85..fca949eb9 100644 --- a/tasks/actorstate/market/sector_deals_v2.go +++ b/tasks/actorstate/market/sector_deals_v2.go @@ -65,10 +65,14 @@ func (SectorDealStateExtractor) Extract(ctx context.Context, a actorstate.ActorI } result, err := ec.CurrState.GetProviderSectors() + if err != nil { + log.Errorf("Get the errors during getting provider sectors: %v", err) + return nil, nil + } dealSectorMap := make(map[abi.DealID]abi.SectorID) - for sectorId, dealIDs := range result { + for sectorID, dealIDs := range result { for _, dealID := range dealIDs { - dealSectorMap[dealID] = sectorId + dealSectorMap[dealID] = sectorID } }