From 344e9a8a245e88bdb768b56c4491679d31ba56d4 Mon Sep 17 00:00:00 2001 From: Po-Chun Chang <31112892+birdychang@users.noreply.github.com> Date: Sun, 4 Jun 2023 21:13:49 -0700 Subject: [PATCH] fix: batch insert for locked_funds, sector_events and sector_info_v7 (#1216) * fix: batch insert into database * added transformers and minor fixes * fix tests * fix more tests --- chain/indexer/integrated/processor/state.go | 42 ++++++++++++------- .../processor/state_internal_test.go | 28 ++++++++----- .../integrated/processor/state_test.go | 37 +++++++++++----- model/actors/init/idaddress.go | 2 +- model/actors/market/dealstate.go | 2 +- model/actors/power/chainpower.go | 6 +-- tasks/actorstate/miner/locked_funds.go | 15 +++++++ tasks/actorstate/miner/sector_events.go | 14 +++++++ tasks/actorstate/miner/sectorv7.go | 14 +++++++ 9 files changed, 118 insertions(+), 42 deletions(-) diff --git a/chain/indexer/integrated/processor/state.go b/chain/indexer/integrated/processor/state.go index bfe68d2a4..6484a11c3 100644 --- a/chain/indexer/integrated/processor/state.go +++ b/chain/indexer/integrated/processor/state.go @@ -441,9 +441,13 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces mineractors.AllCodes(), minertask.InfoExtractor{}, )) case tasktype.MinerLockedFund: - out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap( - mineractors.AllCodes(), minertask.LockedFundsExtractor{}, - )) + out.ActorProcessors[t] = actorstate.NewTaskWithTransformer( + api, + actorstate.NewTypedActorExtractorMap( + mineractors.AllCodes(), minertask.LockedFundsExtractor{}, + ), + minertask.LockedFundsExtractor{}, + ) case tasktype.MinerPreCommitInfo: out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewCustomTypedActorExtractorMap( map[cid.Cid][]actorstate.ActorStateExtractor{ @@ -465,9 +469,13 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces mineractors.AllCodes(), minertask.SectorDealsExtractor{}, )) case tasktype.MinerSectorEvent: - out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap( - mineractors.AllCodes(), minertask.SectorEventsExtractor{}, - )) + out.ActorProcessors[t] = actorstate.NewTaskWithTransformer( + api, + actorstate.NewTypedActorExtractorMap( + mineractors.AllCodes(), minertask.SectorEventsExtractor{}, + ), + minertask.SectorEventsExtractor{}, + ) case tasktype.MinerSectorPost: out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap( mineractors.AllCodes(), minertask.PoStExtractor{}, @@ -484,15 +492,19 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces }, )) case tasktype.MinerSectorInfoV7: - out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewCustomTypedActorExtractorMap( - map[cid.Cid][]actorstate.ActorStateExtractor{ - mineractors.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}}, - mineractors.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}}, - mineractors.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}}, - mineractors.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}}, - mineractors.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}}, - }, - )) + out.ActorProcessors[t] = actorstate.NewTaskWithTransformer( + api, + actorstate.NewCustomTypedActorExtractorMap( + map[cid.Cid][]actorstate.ActorStateExtractor{ + mineractors.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}}, + mineractors.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}}, + mineractors.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}}, + mineractors.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}}, + mineractors.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}}, + }, + ), + minertask.V7SectorInfoExtractor{}, + ) // // Power diff --git a/chain/indexer/integrated/processor/state_internal_test.go b/chain/indexer/integrated/processor/state_internal_test.go index 5140392f6..8354cddc5 100644 --- a/chain/indexer/integrated/processor/state_internal_test.go +++ b/chain/indexer/integrated/processor/state_internal_test.go @@ -80,9 +80,9 @@ func TestNewProcessor(t *testing.T) { require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.DeadlineInfoExtractor{})), proc.actorProcessors[tasktype.MinerCurrentDeadlineInfo]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.FeeDebtExtractor{})), proc.actorProcessors[tasktype.MinerFeeDebt]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.InfoExtractor{})), proc.actorProcessors[tasktype.MinerInfo]) - require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{})), proc.actorProcessors[tasktype.MinerLockedFund]) + require.Equal(t, actorstate.NewTaskWithTransformer(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}), minertask.LockedFundsExtractor{}), proc.actorProcessors[tasktype.MinerLockedFund]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorDealsExtractor{})), proc.actorProcessors[tasktype.MinerSectorDeal]) - require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{})), proc.actorProcessors[tasktype.MinerSectorEvent]) + require.Equal(t, actorstate.NewTaskWithTransformer(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}), minertask.SectorEventsExtractor{}), proc.actorProcessors[tasktype.MinerSectorEvent]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.PoStExtractor{})), proc.actorProcessors[tasktype.MinerSectorPost]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewCustomTypedActorExtractorMap( map[cid.Cid][]actorstate.ActorStateExtractor{ @@ -94,15 +94,21 @@ func TestNewProcessor(t *testing.T) { miner.VersionCodes()[actorstypes.Version6]: {minertask.SectorInfoExtractor{}}, }, )), proc.actorProcessors[tasktype.MinerSectorInfoV1_6]) - require.Equal(t, actorstate.NewTask(nil, actorstate.NewCustomTypedActorExtractorMap( - map[cid.Cid][]actorstate.ActorStateExtractor{ - miner.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}}, - miner.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}}, - miner.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}}, - miner.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}}, - miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}}, - }, - )), proc.actorProcessors[tasktype.MinerSectorInfoV7]) + require.Equal( + t, + actorstate.NewTaskWithTransformer( + nil, + actorstate.NewCustomTypedActorExtractorMap( + map[cid.Cid][]actorstate.ActorStateExtractor{ + miner.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}}, + miner.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}}, + miner.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}}, + miner.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}}, + miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}}, + }), + minertask.V7SectorInfoExtractor{}, + ), + proc.actorProcessors[tasktype.MinerSectorInfoV7]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(power.AllCodes(), powertask.ClaimedPowerExtractor{})), proc.actorProcessors[tasktype.PowerActorClaim]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(power.AllCodes(), powertask.ChainPowerExtractor{})), proc.actorProcessors[tasktype.ChainPower]) require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(reward.AllCodes(), rewardtask.RewardExtractor{})), proc.actorProcessors[tasktype.ChainReward]) diff --git a/chain/indexer/integrated/processor/state_test.go b/chain/indexer/integrated/processor/state_test.go index f5f037073..be15248a4 100644 --- a/chain/indexer/integrated/processor/state_test.go +++ b/chain/indexer/integrated/processor/state_test.go @@ -62,10 +62,6 @@ func TestMakeProcessorsActors(t *testing.T) { taskName: tasktype.MinerInfo, extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.InfoExtractor{}), }, - { - taskName: tasktype.MinerLockedFund, - extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}), - }, { taskName: tasktype.MinerPreCommitInfo, extractor: actorstate.NewCustomTypedActorExtractorMap( @@ -88,10 +84,6 @@ func TestMakeProcessorsActors(t *testing.T) { taskName: tasktype.MinerSectorDeal, extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorDealsExtractor{}), }, - { - taskName: tasktype.MinerSectorEvent, - extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}), - }, { taskName: tasktype.MinerSectorPost, extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.PoStExtractor{}), @@ -109,6 +101,30 @@ func TestMakeProcessorsActors(t *testing.T) { }, ), }, + } + for _, tc := range testCases { + t.Run(tc.taskName, func(t *testing.T) { + proc, err := processor.MakeProcessors(nil, []string{tc.taskName}) + require.NoError(t, err) + require.Len(t, proc.ActorProcessors, 1) + require.Equal(t, actorstate.NewTask(nil, tc.extractor), proc.ActorProcessors[tc.taskName]) + }) + } + testCases2 := []struct { + taskName string + extractor actorstate.ActorExtractorMap + transformer actorstate.ActorDataTransformer + }{ + { + taskName: tasktype.MinerLockedFund, + extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}), + transformer: minertask.LockedFundsExtractor{}, + }, + { + taskName: tasktype.MinerSectorEvent, + extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}), + transformer: minertask.SectorEventsExtractor{}, + }, { taskName: tasktype.MinerSectorInfoV7, extractor: actorstate.NewCustomTypedActorExtractorMap( @@ -120,14 +136,15 @@ func TestMakeProcessorsActors(t *testing.T) { miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}}, }, ), + transformer: minertask.V7SectorInfoExtractor{}, }, } - for _, tc := range testCases { + for _, tc := range testCases2 { t.Run(tc.taskName, func(t *testing.T) { proc, err := processor.MakeProcessors(nil, []string{tc.taskName}) require.NoError(t, err) require.Len(t, proc.ActorProcessors, 1) - require.Equal(t, actorstate.NewTask(nil, tc.extractor), proc.ActorProcessors[tc.taskName]) + require.Equal(t, actorstate.NewTaskWithTransformer(nil, tc.extractor, tc.transformer), proc.ActorProcessors[tc.taskName]) }) } }) diff --git a/model/actors/init/idaddress.go b/model/actors/init/idaddress.go index 45d18f4e0..352b18ca8 100644 --- a/model/actors/init/idaddress.go +++ b/model/actors/init/idaddress.go @@ -66,7 +66,7 @@ func (ia *IDAddress) Persist(ctx context.Context, s model.StorageBatch, version type IDAddressList []*IDAddress func (ias IDAddressList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { - ctx, span := otel.Tracer("").Start(ctx, "IDAddressList.PersistWithTx") + ctx, span := otel.Tracer("").Start(ctx, "IDAddressList.Persist") if span.IsRecording() { span.SetAttributes(attribute.Int("count", len(ias))) } diff --git a/model/actors/market/dealstate.go b/model/actors/market/dealstate.go index 9586da5bd..f10ee8e6a 100644 --- a/model/actors/market/dealstate.go +++ b/model/actors/market/dealstate.go @@ -29,7 +29,7 @@ func (ds *MarketDealState) Persist(ctx context.Context, s model.StorageBatch, ve type MarketDealStates []*MarketDealState func (dss MarketDealStates) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { - ctx, span := otel.Tracer("").Start(ctx, "MarketDealStates.PersistWithTx") + ctx, span := otel.Tracer("").Start(ctx, "MarketDealStates.Persist") if span.IsRecording() { span.SetAttributes(attribute.Int("count", len(dss))) } diff --git a/model/actors/power/chainpower.go b/model/actors/power/chainpower.go index a0ebaddc9..eff25028e 100644 --- a/model/actors/power/chainpower.go +++ b/model/actors/power/chainpower.go @@ -79,7 +79,7 @@ func (cp *ChainPower) AsVersion(version model.Version) (interface{}, bool) { } func (cp *ChainPower) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { - ctx, span := otel.Tracer("").Start(ctx, "ChainPower.PersistWithTx") + ctx, span := otel.Tracer("").Start(ctx, "ChainPower.Persist") defer span.End() ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_powers")) @@ -96,10 +96,8 @@ func (cp *ChainPower) Persist(ctx context.Context, s model.StorageBatch, version // ChainPowerList is a slice of ChainPowers for batch insertion. type ChainPowerList []*ChainPower -// PersistWithTx makes a batch insertion of the list using the given -// transaction. func (cpl ChainPowerList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { - ctx, span := otel.Tracer("").Start(ctx, "ChainPowerList.PersistWithTx") + ctx, span := otel.Tracer("").Start(ctx, "ChainPowerList.Persist") if span.IsRecording() { span.SetAttributes(attribute.Int("count", len(cpl))) } diff --git a/tasks/actorstate/miner/locked_funds.go b/tasks/actorstate/miner/locked_funds.go index 0e3c4d0f8..7f4e2bf76 100644 --- a/tasks/actorstate/miner/locked_funds.go +++ b/tasks/actorstate/miner/locked_funds.go @@ -55,3 +55,18 @@ func (LockedFundsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo, PreCommitDeposits: currLocked.PreCommitDeposits.String(), }, nil } + +func (LockedFundsExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) { + persistableList := make(minermodel.MinerLockedFundsList, 0, len(data)) + for _, d := range data { + if d == nil { + continue + } + a, ok := d.(*minermodel.MinerLockedFund) + if !ok { + return nil, fmt.Errorf("expected MinerLockedFund type but got: %T", d) + } + persistableList = append(persistableList, a) + } + return model.PersistableList{persistableList}, nil +} diff --git a/tasks/actorstate/miner/sector_events.go b/tasks/actorstate/miner/sector_events.go index e1a6d905a..13e67e6cb 100644 --- a/tasks/actorstate/miner/sector_events.go +++ b/tasks/actorstate/miner/sector_events.go @@ -133,6 +133,20 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo return sectorEventModel, nil } +func (SectorEventsExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) { + persistableList := make(minermodel.MinerSectorEventList, 0, len(data)) + for _, d := range data { + ml, ok := d.(minermodel.MinerSectorEventList) + if !ok { + return nil, fmt.Errorf("expected MinerSectorEventList type but got: %T", d) + } + for _, m := range ml { + persistableList = append(persistableList, m) + } + } + return model.PersistableList{persistableList}, nil +} + // ExtractSectorEvents transforms sectorChanges, preCommitChanges, and sectorStateChanges to a MinerSectorEventList. func ExtractSectorEvents(extState extraction.State, sectorChanges *miner.SectorChanges, preCommitChanges *miner.PreCommitChanges, sectorStateChanges *SectorStateEvents) (minermodel.MinerSectorEventList, error) { sectorStateEvents, err := ExtractMinerSectorStateEvents(extState, sectorStateChanges) diff --git a/tasks/actorstate/miner/sectorv7.go b/tasks/actorstate/miner/sectorv7.go index c02eba91e..13c026ba1 100644 --- a/tasks/actorstate/miner/sectorv7.go +++ b/tasks/actorstate/miner/sectorv7.go @@ -76,3 +76,17 @@ func (V7SectorInfoExtractor) Extract(ctx context.Context, a actorstate.ActorInfo return sectorModel, nil } + +func (V7SectorInfoExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) { + persistableList := make(minermodel.MinerSectorInfoV7List, 0, len(data)) + for _, d := range data { + ml, ok := d.(minermodel.MinerSectorInfoV7List) + if !ok { + return nil, fmt.Errorf("expected MinerSectorInfoV7 type but got: %T", d) + } + for _, m := range ml { + persistableList = append(persistableList, m) + } + } + return model.PersistableList{persistableList}, nil +}