Skip to content

Commit

Permalink
feat(task): add reward actor processing (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist authored Sep 21, 2020
1 parent f074f46 commit 93c7749
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 3 deletions.
35 changes: 35 additions & 0 deletions model/actors/reward/chainreward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package reward

import (
"context"
"github.com/go-pg/pg/v10"
)

type ChainReward struct {
StateRoot string `pg:",pk,notnull"`
CumSumBaseline string `pg:",notnull"`
CumSumRealized string `pg:",notnull"`
EffectiveBaselinePower string `pg:",notnull"`
NewBaselinePower string `pg:",notnull"`
NewRewardSmoothedPositionEstimate string `pg:",notnull"`
NewRewardSmoothedVelocityEstimate string `pg:",notnull"`
TotalMinedReward string `pg:",notnull"`

NewReward string `pg:",use_zero"`
EffectiveNetworkTime int64 `pg:",use_zero"`
}

func (r *ChainReward) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, r).
OnConflict("do nothing").
Insert(); err != nil {
return err
}
return nil
}

func (r *ChainReward) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
return r.PersistWithTx(ctx, tx)
})
}
34 changes: 32 additions & 2 deletions services/processor/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package processor

import (
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/power"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"github.com/ipfs/go-cid"
Expand All @@ -16,6 +15,8 @@ import (
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/market"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/message"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/miner"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/power"
"github.com/filecoin-project/sentinel-visor/services/processor/tasks/reward"
)

const (
Expand All @@ -33,6 +34,9 @@ const (

PowerTaskName = "process_power"
PowerPoolName = "power_actor_tasks"

RewardTaskName = "process_reward"
RewardPoolName = "reward_tasks"
)

// Make a redis pool
Expand All @@ -51,14 +55,17 @@ func NewScheduler(node lens.API, pubCh chan<- model.Persistable) *Scheduler {
marketPool, marketQueue := market.Setup(64, MarketTaskName, MarketPoolName, redisPool, node, pubCh)
msgPool, msgQueue := message.Setup(64, MessageTaskName, MessagePoolName, redisPool, node, pubCh)
powerPool, powerQueue := power.Setup(64, PowerTaskName, PowerPoolName, redisPool, node, pubCh)
rwdPool, rwdQueue := reward.Setup(4, RewardTaskName, RewardPoolName, redisPool, node, pubCh)

pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool}

pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool}
queues := map[string]*work.Enqueuer{
GenesisTaskName: genesisQueue,
MinerTaskName: minerQueue,
MarketTaskName: marketQueue,
MessageTaskName: msgQueue,
PowerTaskName: powerQueue,
RewardTaskName: rwdQueue,
}

return &Scheduler{
Expand Down Expand Up @@ -111,6 +118,11 @@ func (s *Scheduler) Dispatch(tips indexer.ActorTips) error {
if err != nil {
return err
}
case builtin.RewardActorCodeID:
_, err := s.queueRewardTask(actor)
if err != nil {
return err
}
}
}
}
Expand Down Expand Up @@ -194,3 +206,21 @@ func (s *Scheduler) queueMessageTask(ts types.TipSetKey, st cid.Cid) (*work.Job,
"stateroot": st.String(),
})
}

func (s *Scheduler) queueRewardTask(info indexer.ActorInfo) (*work.Job, error) {
tsB, err := info.TipSet.MarshalJSON()
if err != nil {
return nil, err
}
ptsB, err := info.ParentTipSet.MarshalJSON()
if err != nil {
return nil, err
}
return s.queues[RewardTaskName].Enqueue(RewardTaskName, work.Q{
"ts": string(tsB),
"pts": string(ptsB),
"head": info.Actor.Head.String(),
"stateroot": info.ParentStateRoot.String(),
})

}
127 changes: 127 additions & 0 deletions services/processor/tasks/reward/reward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package reward

import (
"bytes"
"context"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin/reward"

"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/sentinel-visor/model"
rewardmodel "github.com/filecoin-project/sentinel-visor/model/actors/reward"
)

func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) {
pool := work.NewWorkerPool(ProcessRewardTask{}, concurrency, poolName, redisPool)
queue := work.NewEnqueuer(poolName, redisPool)

// https://github.com/gocraft/work/issues/10#issuecomment-237580604
// adding fields via a closure gives the workers access to the lotus api, a global could also be used here
pool.Middleware(func(mt *ProcessRewardTask, job *work.Job, next work.NextMiddlewareFunc) error {
mt.node = node
mt.pubCh = pubCh
mt.log = logging.Logger("rewardtask")
return next()
})
logging.SetLogLevel("rewardtask", "debug")
// log all task
pool.Middleware((*ProcessRewardTask).Log)

// register task method and don't allow retying
pool.JobWithOptions(taskName, work.JobOptions{
MaxFails: 1,
}, (*ProcessRewardTask).Task)

return pool, queue
}

type ProcessRewardTask struct {
node lens.API
log *logging.ZapEventLogger

pubCh chan<- model.Persistable

ts types.TipSetKey
head cid.Cid
stateroot cid.Cid
}

func (p *ProcessRewardTask) Log(job *work.Job, next work.NextMiddlewareFunc) error {
p.log.Infow("starting process reward task", "job", job.ID, "args", job.Args)
return next()
}

func (p *ProcessRewardTask) ParseArgs(job *work.Job) error {
tsStr := job.ArgString("ts")
if err := job.ArgError(); err != nil {
return err
}

headStr := job.ArgString("head")
if err := job.ArgError(); err != nil {
return err
}

srStr := job.ArgString("stateroot")
if err := job.ArgError(); err != nil {
return err
}

stateroot, err := cid.Decode(srStr)
if err != nil {
return err
}

head, err := cid.Decode(headStr)
if err != nil {
return err
}

var tsKey types.TipSetKey
if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil {
return err
}

p.ts = tsKey
p.head = head
p.stateroot = stateroot
return nil
}

func (p *ProcessRewardTask) Task(job *work.Job) error {
if err := p.ParseArgs(job); err != nil {
return err
}

ctx := context.TODO()

rewardStateRaw, err := p.node.ChainReadObj(ctx, p.head)
if err != nil {
return err
}

var rwdState reward.State
if err := rwdState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil {
return err
}

p.pubCh <- &rewardmodel.ChainReward{
StateRoot: p.stateroot.String(),
CumSumBaseline: rwdState.CumsumBaseline.String(),
CumSumRealized: rwdState.CumsumRealized.String(),
EffectiveBaselinePower: rwdState.EffectiveBaselinePower.String(),
NewBaselinePower: rwdState.ThisEpochBaselinePower.String(),
NewRewardSmoothedPositionEstimate: rwdState.ThisEpochRewardSmoothed.PositionEstimate.String(),
NewRewardSmoothedVelocityEstimate: rwdState.ThisEpochRewardSmoothed.VelocityEstimate.String(),
TotalMinedReward: rwdState.TotalMined.String(),
NewReward: rwdState.ThisEpochReward.String(),
EffectiveNetworkTime: int64(rwdState.EffectiveNetworkTime),
}

return nil
}
4 changes: 3 additions & 1 deletion storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"context"
"github.com/filecoin-project/sentinel-visor/model/actors/power"
"time"

"github.com/go-pg/pg/v10"
Expand All @@ -12,6 +11,8 @@ import (

"github.com/filecoin-project/sentinel-visor/model/actors/market"
"github.com/filecoin-project/sentinel-visor/model/actors/miner"
"github.com/filecoin-project/sentinel-visor/model/actors/power"
"github.com/filecoin-project/sentinel-visor/model/actors/reward"
"github.com/filecoin-project/sentinel-visor/model/blocks"
"github.com/filecoin-project/sentinel-visor/model/messages"
)
Expand All @@ -38,6 +39,7 @@ var models = []interface{}{
(*messages.Receipt)(nil),

(*power.ChainPower)(nil),
(*reward.ChainReward)(nil),
}

func NewDatabase(ctx context.Context, url string) (*Database, error) {
Expand Down

0 comments on commit 93c7749

Please sign in to comment.