Skip to content

Commit

Permalink
fix: batch insert for locked_funds, sector_events and sector_info_v7 (#…
Browse files Browse the repository at this point in the history
…1216)

* fix: batch insert into database

* added transformers and minor fixes

* fix tests

* fix more tests
  • Loading branch information
birdychang authored Jun 5, 2023
1 parent 5a34226 commit 344e9a8
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 42 deletions.
42 changes: 27 additions & 15 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,13 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
mineractors.AllCodes(), minertask.InfoExtractor{},
))
case tasktype.MinerLockedFund:
out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap(
mineractors.AllCodes(), minertask.LockedFundsExtractor{},
))
out.ActorProcessors[t] = actorstate.NewTaskWithTransformer(
api,
actorstate.NewTypedActorExtractorMap(
mineractors.AllCodes(), minertask.LockedFundsExtractor{},
),
minertask.LockedFundsExtractor{},
)
case tasktype.MinerPreCommitInfo:
out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
Expand All @@ -465,9 +469,13 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
mineractors.AllCodes(), minertask.SectorDealsExtractor{},
))
case tasktype.MinerSectorEvent:
out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap(
mineractors.AllCodes(), minertask.SectorEventsExtractor{},
))
out.ActorProcessors[t] = actorstate.NewTaskWithTransformer(
api,
actorstate.NewTypedActorExtractorMap(
mineractors.AllCodes(), minertask.SectorEventsExtractor{},
),
minertask.SectorEventsExtractor{},
)
case tasktype.MinerSectorPost:
out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewTypedActorExtractorMap(
mineractors.AllCodes(), minertask.PoStExtractor{},
Expand All @@ -484,15 +492,19 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
},
))
case tasktype.MinerSectorInfoV7:
out.ActorProcessors[t] = actorstate.NewTask(api, actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
mineractors.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}},
},
))
out.ActorProcessors[t] = actorstate.NewTaskWithTransformer(
api,
actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
mineractors.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}},
mineractors.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}},
},
),
minertask.V7SectorInfoExtractor{},
)

//
// Power
Expand Down
28 changes: 17 additions & 11 deletions chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.DeadlineInfoExtractor{})), proc.actorProcessors[tasktype.MinerCurrentDeadlineInfo])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.FeeDebtExtractor{})), proc.actorProcessors[tasktype.MinerFeeDebt])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.InfoExtractor{})), proc.actorProcessors[tasktype.MinerInfo])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{})), proc.actorProcessors[tasktype.MinerLockedFund])
require.Equal(t, actorstate.NewTaskWithTransformer(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}), minertask.LockedFundsExtractor{}), proc.actorProcessors[tasktype.MinerLockedFund])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorDealsExtractor{})), proc.actorProcessors[tasktype.MinerSectorDeal])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{})), proc.actorProcessors[tasktype.MinerSectorEvent])
require.Equal(t, actorstate.NewTaskWithTransformer(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}), minertask.SectorEventsExtractor{}), proc.actorProcessors[tasktype.MinerSectorEvent])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.PoStExtractor{})), proc.actorProcessors[tasktype.MinerSectorPost])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
Expand All @@ -94,15 +94,21 @@ func TestNewProcessor(t *testing.T) {
miner.VersionCodes()[actorstypes.Version6]: {minertask.SectorInfoExtractor{}},
},
)), proc.actorProcessors[tasktype.MinerSectorInfoV1_6])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
miner.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}},
},
)), proc.actorProcessors[tasktype.MinerSectorInfoV7])
require.Equal(
t,
actorstate.NewTaskWithTransformer(
nil,
actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
miner.VersionCodes()[actorstypes.Version7]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version8]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version9]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version10]: {minertask.V7SectorInfoExtractor{}},
miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}},
}),
minertask.V7SectorInfoExtractor{},
),
proc.actorProcessors[tasktype.MinerSectorInfoV7])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(power.AllCodes(), powertask.ClaimedPowerExtractor{})), proc.actorProcessors[tasktype.PowerActorClaim])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(power.AllCodes(), powertask.ChainPowerExtractor{})), proc.actorProcessors[tasktype.ChainPower])
require.Equal(t, actorstate.NewTask(nil, actorstate.NewTypedActorExtractorMap(reward.AllCodes(), rewardtask.RewardExtractor{})), proc.actorProcessors[tasktype.ChainReward])
Expand Down
37 changes: 27 additions & 10 deletions chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func TestMakeProcessorsActors(t *testing.T) {
taskName: tasktype.MinerInfo,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.InfoExtractor{}),
},
{
taskName: tasktype.MinerLockedFund,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}),
},
{
taskName: tasktype.MinerPreCommitInfo,
extractor: actorstate.NewCustomTypedActorExtractorMap(
Expand All @@ -88,10 +84,6 @@ func TestMakeProcessorsActors(t *testing.T) {
taskName: tasktype.MinerSectorDeal,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorDealsExtractor{}),
},
{
taskName: tasktype.MinerSectorEvent,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}),
},
{
taskName: tasktype.MinerSectorPost,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.PoStExtractor{}),
Expand All @@ -109,6 +101,30 @@ func TestMakeProcessorsActors(t *testing.T) {
},
),
},
}
for _, tc := range testCases {
t.Run(tc.taskName, func(t *testing.T) {
proc, err := processor.MakeProcessors(nil, []string{tc.taskName})
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 1)
require.Equal(t, actorstate.NewTask(nil, tc.extractor), proc.ActorProcessors[tc.taskName])
})
}
testCases2 := []struct {
taskName string
extractor actorstate.ActorExtractorMap
transformer actorstate.ActorDataTransformer
}{
{
taskName: tasktype.MinerLockedFund,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.LockedFundsExtractor{}),
transformer: minertask.LockedFundsExtractor{},
},
{
taskName: tasktype.MinerSectorEvent,
extractor: actorstate.NewTypedActorExtractorMap(miner.AllCodes(), minertask.SectorEventsExtractor{}),
transformer: minertask.SectorEventsExtractor{},
},
{
taskName: tasktype.MinerSectorInfoV7,
extractor: actorstate.NewCustomTypedActorExtractorMap(
Expand All @@ -120,14 +136,15 @@ func TestMakeProcessorsActors(t *testing.T) {
miner.VersionCodes()[actorstypes.Version11]: {minertask.V7SectorInfoExtractor{}},
},
),
transformer: minertask.V7SectorInfoExtractor{},
},
}
for _, tc := range testCases {
for _, tc := range testCases2 {
t.Run(tc.taskName, func(t *testing.T) {
proc, err := processor.MakeProcessors(nil, []string{tc.taskName})
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 1)
require.Equal(t, actorstate.NewTask(nil, tc.extractor), proc.ActorProcessors[tc.taskName])
require.Equal(t, actorstate.NewTaskWithTransformer(nil, tc.extractor, tc.transformer), proc.ActorProcessors[tc.taskName])
})
}
})
Expand Down
2 changes: 1 addition & 1 deletion model/actors/init/idaddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (ia *IDAddress) Persist(ctx context.Context, s model.StorageBatch, version
type IDAddressList []*IDAddress

func (ias IDAddressList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "IDAddressList.PersistWithTx")
ctx, span := otel.Tracer("").Start(ctx, "IDAddressList.Persist")
if span.IsRecording() {
span.SetAttributes(attribute.Int("count", len(ias)))
}
Expand Down
2 changes: 1 addition & 1 deletion model/actors/market/dealstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (ds *MarketDealState) Persist(ctx context.Context, s model.StorageBatch, ve
type MarketDealStates []*MarketDealState

func (dss MarketDealStates) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "MarketDealStates.PersistWithTx")
ctx, span := otel.Tracer("").Start(ctx, "MarketDealStates.Persist")
if span.IsRecording() {
span.SetAttributes(attribute.Int("count", len(dss)))
}
Expand Down
6 changes: 2 additions & 4 deletions model/actors/power/chainpower.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (cp *ChainPower) AsVersion(version model.Version) (interface{}, bool) {
}

func (cp *ChainPower) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "ChainPower.PersistWithTx")
ctx, span := otel.Tracer("").Start(ctx, "ChainPower.Persist")
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "chain_powers"))
Expand All @@ -96,10 +96,8 @@ func (cp *ChainPower) Persist(ctx context.Context, s model.StorageBatch, version
// ChainPowerList is a slice of ChainPowers for batch insertion.
type ChainPowerList []*ChainPower

// PersistWithTx makes a batch insertion of the list using the given
// transaction.
func (cpl ChainPowerList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, span := otel.Tracer("").Start(ctx, "ChainPowerList.PersistWithTx")
ctx, span := otel.Tracer("").Start(ctx, "ChainPowerList.Persist")
if span.IsRecording() {
span.SetAttributes(attribute.Int("count", len(cpl)))
}
Expand Down
15 changes: 15 additions & 0 deletions tasks/actorstate/miner/locked_funds.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,18 @@ func (LockedFundsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo,
PreCommitDeposits: currLocked.PreCommitDeposits.String(),
}, nil
}

func (LockedFundsExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) {
persistableList := make(minermodel.MinerLockedFundsList, 0, len(data))
for _, d := range data {
if d == nil {
continue
}
a, ok := d.(*minermodel.MinerLockedFund)
if !ok {
return nil, fmt.Errorf("expected MinerLockedFund type but got: %T", d)
}
persistableList = append(persistableList, a)
}
return model.PersistableList{persistableList}, nil
}
14 changes: 14 additions & 0 deletions tasks/actorstate/miner/sector_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ func (SectorEventsExtractor) Extract(ctx context.Context, a actorstate.ActorInfo
return sectorEventModel, nil
}

func (SectorEventsExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) {
persistableList := make(minermodel.MinerSectorEventList, 0, len(data))
for _, d := range data {
ml, ok := d.(minermodel.MinerSectorEventList)
if !ok {
return nil, fmt.Errorf("expected MinerSectorEventList type but got: %T", d)
}
for _, m := range ml {
persistableList = append(persistableList, m)
}
}
return model.PersistableList{persistableList}, nil
}

// ExtractSectorEvents transforms sectorChanges, preCommitChanges, and sectorStateChanges to a MinerSectorEventList.
func ExtractSectorEvents(extState extraction.State, sectorChanges *miner.SectorChanges, preCommitChanges *miner.PreCommitChanges, sectorStateChanges *SectorStateEvents) (minermodel.MinerSectorEventList, error) {
sectorStateEvents, err := ExtractMinerSectorStateEvents(extState, sectorStateChanges)
Expand Down
14 changes: 14 additions & 0 deletions tasks/actorstate/miner/sectorv7.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,17 @@ func (V7SectorInfoExtractor) Extract(ctx context.Context, a actorstate.ActorInfo

return sectorModel, nil
}

func (V7SectorInfoExtractor) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) {
persistableList := make(minermodel.MinerSectorInfoV7List, 0, len(data))
for _, d := range data {
ml, ok := d.(minermodel.MinerSectorInfoV7List)
if !ok {
return nil, fmt.Errorf("expected MinerSectorInfoV7 type but got: %T", d)
}
for _, m := range ml {
persistableList = append(persistableList, m)
}
}
return model.PersistableList{persistableList}, nil
}

0 comments on commit 344e9a8

Please sign in to comment.