Skip to content

Commit

Permalink
feat(task): add actor, actor-state, and init actor processing (#14)
Browse files Browse the repository at this point in the history
* feat(task): add actor and actor state processing

* feat(task): add init actor processing
  • Loading branch information
frrist authored Sep 21, 2020
1 parent 93c7749 commit c5f66b5
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 14 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-bitfield v0.2.0
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df
github.com/filecoin-project/lotus v0.6.1
github.com/filecoin-project/specs-actors v0.9.7
github.com/go-pg/pg/v10 v10.0.7
Expand Down
40 changes: 40 additions & 0 deletions model/actors/common/actors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package common

import (
"context"
"github.com/go-pg/pg/v10"
)

type Actor struct {
ID string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`
Code string `pg:",notnull"`
Head string `pg:",notnull"`
Balance string `pg:",notnull"`
Nonce uint64 `pg:",use_zero"`
}

func (a *Actor) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, a).
OnConflict("do nothing").
Insert(); err != nil {
return err
}
return nil
}

type ActorState struct {
Head string `pg:",pk,notnull"`
Code string `pg:",pk,notnull"`
State string `pg:",notnull"`
}

func (s *ActorState) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, s).
OnConflict("do nothing").
Insert(); err != nil {
return err
}
return nil

}
23 changes: 23 additions & 0 deletions model/actors/common/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package common

import (
"context"
"github.com/go-pg/pg/v10"
)

type ActorTaskResult struct {
Actor *Actor
State *ActorState
}

func (a *ActorTaskResult) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
if err := a.Actor.PersistWithTx(ctx, tx); err != nil {
return err
}
if err := a.State.PersistWithTx(ctx, tx); err != nil {
return err
}
return nil
})
}
38 changes: 38 additions & 0 deletions model/actors/init/idaddress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package init

import (
"context"
"github.com/go-pg/pg/v10"
)

type IdAddress struct {
ID string `pg:",pk,notnull"`
Address string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`
}

func (ia *IdAddress) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, ia).
OnConflict("do nothing").
Insert(); err != nil {
return err
}
return nil
}

type IdAddressList []*IdAddress

func (ias IdAddressList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
for _, ia := range ias {
if err := ia.PersistWithTx(ctx, tx); err != nil {
return err
}
}
return nil
}

func (ias IdAddressList) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
return ias.PersistWithTx(ctx, tx)
})
}
22 changes: 20 additions & 2 deletions model/genesis/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ package genesis
import (
"context"

init_ "github.com/filecoin-project/sentinel-visor/model/actors/init"
"github.com/filecoin-project/sentinel-visor/model/actors/market"
"github.com/filecoin-project/sentinel-visor/model/actors/miner"
"github.com/go-pg/pg/v10"
"go.opentelemetry.io/otel/api/global"
)

type ProcessGenesisSingletonResult struct {
minerResults []*GenesisMinerTaskResult
marketResult *GenesisMarketTaskResult
minerResults []*GenesisMinerTaskResult
marketResult *GenesisMarketTaskResult
initActorResult *GenesisInitActorTaskResult
}

func (r *ProcessGenesisSingletonResult) Persist(ctx context.Context, db *pg.DB) error {
Expand Down Expand Up @@ -40,6 +42,11 @@ func (r *ProcessGenesisSingletonResult) Persist(ctx context.Context, db *pg.DB)
return err
}
}
if r.initActorResult != nil {
if err := r.initActorResult.AddressMap.PersistWithTx(ctx, tx); err != nil {
return err
}
}
return nil
})
}
Expand All @@ -55,6 +62,13 @@ func (r *ProcessGenesisSingletonResult) SetMarket(m *GenesisMarketTaskResult) {
r.marketResult = m
}

func (r *ProcessGenesisSingletonResult) SetInitActor(m *GenesisInitActorTaskResult) {
if r.initActorResult != nil {
panic("Genesis InitActor State already set, developer error!!!")
}
r.initActorResult = m
}

type GenesisMinerTaskResult struct {
StateModel *miner.MinerState
PowerModel *miner.MinerPower
Expand All @@ -66,3 +80,7 @@ type GenesisMarketTaskResult struct {
DealModels market.MarketDealStates
ProposalModesl market.MarketDealProposals
}

type GenesisInitActorTaskResult struct {
AddressMap init_.IdAddressList
}
2 changes: 1 addition & 1 deletion services/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"github.com/filecoin-project/lotus/lib/parmap"
"strings"
"sync"
"time"
Expand All @@ -15,6 +14,7 @@ import (

"github.com/filecoin-project/go-address"
types "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/parmap"
"github.com/filecoin-project/sentinel-visor/storage"

"github.com/filecoin-project/sentinel-visor/lens"
Expand Down
80 changes: 71 additions & 9 deletions services/processor/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"github.com/ipfs/go-cid"
"strconv"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/services/indexer"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/common"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/genesis"
init_ "github.com/filecoin-project/sentinel-visor/services/processor/tasks/init"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/market"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/message"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/miner"
Expand All @@ -21,7 +24,10 @@ import (

const (
GenesisTaskName = "process_genesis"
GensisPoolName = "genesis_tasks"
GenesisPoolName = "genesis_tasks"

InitActorTaskName = "process_init_actor"
InitActorPoolName = "init_actor_tasks"

MinerTaskName = "process_miner"
MinerPoolName = "miner_actor_tasks"
Expand All @@ -37,6 +43,9 @@ const (

RewardTaskName = "process_reward"
RewardPoolName = "reward_tasks"

CommonTaskName = "process_common"
CommonPoolName = "common_actor_tasks"
)

// Make a redis pool
Expand All @@ -50,22 +59,26 @@ var redisPool = &redis.Pool{
}

func NewScheduler(node lens.API, pubCh chan<- model.Persistable) *Scheduler {
genesisPool, genesisQueue := genesis.Setup(1, GenesisTaskName, GensisPoolName, redisPool, node, pubCh)
genesisPool, genesisQueue := genesis.Setup(1, GenesisTaskName, GenesisPoolName, redisPool, node, pubCh)
minerPool, minerQueue := miner.Setup(64, MinerTaskName, MinerPoolName, redisPool, node, pubCh)
marketPool, marketQueue := market.Setup(64, MarketTaskName, MarketPoolName, redisPool, node, pubCh)
msgPool, msgQueue := message.Setup(64, MessageTaskName, MessagePoolName, redisPool, node, pubCh)
powerPool, powerQueue := power.Setup(64, PowerTaskName, PowerPoolName, redisPool, node, pubCh)
rwdPool, rwdQueue := reward.Setup(4, RewardTaskName, RewardPoolName, redisPool, node, pubCh)
comPool, comQueue := common.Setup(64, CommonTaskName, CommonPoolName, redisPool, node, pubCh)
initPool, initQueue := init_.Setup(64, InitActorTaskName, InitActorPoolName, redisPool, node, pubCh)

pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool}
pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool, comPool, initPool}

queues := map[string]*work.Enqueuer{
GenesisTaskName: genesisQueue,
MinerTaskName: minerQueue,
MarketTaskName: marketQueue,
MessageTaskName: msgQueue,
PowerTaskName: powerQueue,
RewardTaskName: rwdQueue,
GenesisTaskName: genesisQueue,
MinerTaskName: minerQueue,
MarketTaskName: marketQueue,
MessageTaskName: msgQueue,
PowerTaskName: powerQueue,
RewardTaskName: rwdQueue,
CommonTaskName: comQueue,
InitActorTaskName: initQueue,
}

return &Scheduler{
Expand Down Expand Up @@ -102,7 +115,16 @@ func (s *Scheduler) Dispatch(tips indexer.ActorTips) error {
return err
}
for _, actor := range actors {
_, err := s.queueCommonTask(actor)
if err != nil {
return err
}
switch actor.Actor.Code {
case builtin.InitActorCodeID:
_, err := s.queueInitTask(actor)
if err != nil {
return err
}
case builtin.StorageMinerActorCodeID:
_, err := s.queueMinerTask(actor)
if err != nil {
Expand Down Expand Up @@ -182,6 +204,24 @@ func (s *Scheduler) queueMarketTask(info indexer.ActorInfo) (*work.Job, error) {
"address": info.Address.String(),
"stateroot": info.ParentStateRoot.String(),
})
}

func (s *Scheduler) queueInitTask(info indexer.ActorInfo) (*work.Job, error) {
tsB, err := info.TipSet.MarshalJSON()
if err != nil {
return nil, err
}
ptsB, err := info.ParentTipSet.MarshalJSON()
if err != nil {
return nil, err
}
return s.queues[InitActorTaskName].Enqueue(InitActorTaskName, work.Q{
"ts": string(tsB),
"pts": string(ptsB),
"head": info.Actor.Head.String(),
"address": info.Address.String(),
"stateroot": info.ParentStateRoot.String(),
})

}

Expand Down Expand Up @@ -216,11 +256,33 @@ func (s *Scheduler) queueRewardTask(info indexer.ActorInfo) (*work.Job, error) {
if err != nil {
return nil, err
}

return s.queues[RewardTaskName].Enqueue(RewardTaskName, work.Q{
"ts": string(tsB),
"pts": string(ptsB),
"head": info.Actor.Head.String(),
"stateroot": info.ParentStateRoot.String(),
})
}

func (s *Scheduler) queueCommonTask(info indexer.ActorInfo) (*work.Job, error) {
tsB, err := info.TipSet.MarshalJSON()
if err != nil {
return nil, err
}
ptsB, err := info.ParentTipSet.MarshalJSON()
if err != nil {
return nil, err
}

return s.queues[CommonTaskName].Enqueue(CommonTaskName, work.Q{
"ts": string(tsB),
"pts": string(ptsB),
"stateroot": info.ParentStateRoot.String(),
"address": info.Address.String(),
"head": info.Actor.Head.String(),
"code": info.Actor.Code.String(),
"balance": info.Actor.Balance.String(),
"nonce": strconv.FormatUint(info.Actor.Nonce, 10),
})
}
Loading

0 comments on commit c5f66b5

Please sign in to comment.