diff --git a/model/actors/reward/chainreward.go b/model/actors/reward/chainreward.go new file mode 100644 index 000000000..edaec54b2 --- /dev/null +++ b/model/actors/reward/chainreward.go @@ -0,0 +1,35 @@ +package reward + +import ( + "context" + "github.com/go-pg/pg/v10" +) + +type ChainReward struct { + StateRoot string `pg:",pk,notnull"` + CumSumBaseline string `pg:",notnull"` + CumSumRealized string `pg:",notnull"` + EffectiveBaselinePower string `pg:",notnull"` + NewBaselinePower string `pg:",notnull"` + NewRewardSmoothedPositionEstimate string `pg:",notnull"` + NewRewardSmoothedVelocityEstimate string `pg:",notnull"` + TotalMinedReward string `pg:",notnull"` + + NewReward string `pg:",use_zero"` + EffectiveNetworkTime int64 `pg:",use_zero"` +} + +func (r *ChainReward) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, r). + OnConflict("do nothing"). + Insert(); err != nil { + return err + } + return nil +} + +func (r *ChainReward) Persist(ctx context.Context, db *pg.DB) error { + return db.RunInTransaction(ctx, func(tx *pg.Tx) error { + return r.PersistWithTx(ctx, tx) + }) +} diff --git a/services/processor/scheduler.go b/services/processor/scheduler.go index 269766d8f..b82083a8f 100644 --- a/services/processor/scheduler.go +++ b/services/processor/scheduler.go @@ -1,7 +1,6 @@ package processor import ( - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/power" "github.com/gocraft/work" "github.com/gomodule/redigo/redis" "github.com/ipfs/go-cid" @@ -16,6 +15,8 @@ import ( "github.com/filecoin-project/sentinel-visor/services/processor/tasks/market" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/message" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/miner" + "github.com/filecoin-project/sentinel-visor/services/processor/tasks/power" + "github.com/filecoin-project/sentinel-visor/services/processor/tasks/reward" ) const ( @@ -33,6 +34,9 @@ const ( PowerTaskName = "process_power" PowerPoolName = "power_actor_tasks" + + RewardTaskName = "process_reward" + RewardPoolName = "reward_tasks" ) // Make a redis pool @@ -51,14 +55,17 @@ func NewScheduler(node lens.API, pubCh chan<- model.Persistable) *Scheduler { marketPool, marketQueue := market.Setup(64, MarketTaskName, MarketPoolName, redisPool, node, pubCh) msgPool, msgQueue := message.Setup(64, MessageTaskName, MessagePoolName, redisPool, node, pubCh) powerPool, powerQueue := power.Setup(64, PowerTaskName, PowerPoolName, redisPool, node, pubCh) + rwdPool, rwdQueue := reward.Setup(4, RewardTaskName, RewardPoolName, redisPool, node, pubCh) + + pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool} - pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool} queues := map[string]*work.Enqueuer{ GenesisTaskName: genesisQueue, MinerTaskName: minerQueue, MarketTaskName: marketQueue, MessageTaskName: msgQueue, PowerTaskName: powerQueue, + RewardTaskName: rwdQueue, } return &Scheduler{ @@ -111,6 +118,11 @@ func (s *Scheduler) Dispatch(tips indexer.ActorTips) error { if err != nil { return err } + case builtin.RewardActorCodeID: + _, err := s.queueRewardTask(actor) + if err != nil { + return err + } } } } @@ -194,3 +206,21 @@ func (s *Scheduler) queueMessageTask(ts types.TipSetKey, st cid.Cid) (*work.Job, "stateroot": st.String(), }) } + +func (s *Scheduler) queueRewardTask(info indexer.ActorInfo) (*work.Job, error) { + tsB, err := info.TipSet.MarshalJSON() + if err != nil { + return nil, err + } + ptsB, err := info.ParentTipSet.MarshalJSON() + if err != nil { + return nil, err + } + return s.queues[RewardTaskName].Enqueue(RewardTaskName, work.Q{ + "ts": string(tsB), + "pts": string(ptsB), + "head": info.Actor.Head.String(), + "stateroot": info.ParentStateRoot.String(), + }) + +} diff --git a/services/processor/tasks/reward/reward.go b/services/processor/tasks/reward/reward.go new file mode 100644 index 000000000..0c495ea23 --- /dev/null +++ b/services/processor/tasks/reward/reward.go @@ -0,0 +1,127 @@ +package reward + +import ( + "bytes" + "context" + "github.com/gocraft/work" + "github.com/gomodule/redigo/redis" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/specs-actors/actors/builtin/reward" + + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + rewardmodel "github.com/filecoin-project/sentinel-visor/model/actors/reward" +) + +func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { + pool := work.NewWorkerPool(ProcessRewardTask{}, concurrency, poolName, redisPool) + queue := work.NewEnqueuer(poolName, redisPool) + + // https://github.com/gocraft/work/issues/10#issuecomment-237580604 + // adding fields via a closure gives the workers access to the lotus api, a global could also be used here + pool.Middleware(func(mt *ProcessRewardTask, job *work.Job, next work.NextMiddlewareFunc) error { + mt.node = node + mt.pubCh = pubCh + mt.log = logging.Logger("rewardtask") + return next() + }) + logging.SetLogLevel("rewardtask", "debug") + // log all task + pool.Middleware((*ProcessRewardTask).Log) + + // register task method and don't allow retying + pool.JobWithOptions(taskName, work.JobOptions{ + MaxFails: 1, + }, (*ProcessRewardTask).Task) + + return pool, queue +} + +type ProcessRewardTask struct { + node lens.API + log *logging.ZapEventLogger + + pubCh chan<- model.Persistable + + ts types.TipSetKey + head cid.Cid + stateroot cid.Cid +} + +func (p *ProcessRewardTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { + p.log.Infow("starting process reward task", "job", job.ID, "args", job.Args) + return next() +} + +func (p *ProcessRewardTask) ParseArgs(job *work.Job) error { + tsStr := job.ArgString("ts") + if err := job.ArgError(); err != nil { + return err + } + + headStr := job.ArgString("head") + if err := job.ArgError(); err != nil { + return err + } + + srStr := job.ArgString("stateroot") + if err := job.ArgError(); err != nil { + return err + } + + stateroot, err := cid.Decode(srStr) + if err != nil { + return err + } + + head, err := cid.Decode(headStr) + if err != nil { + return err + } + + var tsKey types.TipSetKey + if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { + return err + } + + p.ts = tsKey + p.head = head + p.stateroot = stateroot + return nil +} + +func (p *ProcessRewardTask) Task(job *work.Job) error { + if err := p.ParseArgs(job); err != nil { + return err + } + + ctx := context.TODO() + + rewardStateRaw, err := p.node.ChainReadObj(ctx, p.head) + if err != nil { + return err + } + + var rwdState reward.State + if err := rwdState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { + return err + } + + p.pubCh <- &rewardmodel.ChainReward{ + StateRoot: p.stateroot.String(), + CumSumBaseline: rwdState.CumsumBaseline.String(), + CumSumRealized: rwdState.CumsumRealized.String(), + EffectiveBaselinePower: rwdState.EffectiveBaselinePower.String(), + NewBaselinePower: rwdState.ThisEpochBaselinePower.String(), + NewRewardSmoothedPositionEstimate: rwdState.ThisEpochRewardSmoothed.PositionEstimate.String(), + NewRewardSmoothedVelocityEstimate: rwdState.ThisEpochRewardSmoothed.VelocityEstimate.String(), + TotalMinedReward: rwdState.TotalMined.String(), + NewReward: rwdState.ThisEpochReward.String(), + EffectiveNetworkTime: int64(rwdState.EffectiveNetworkTime), + } + + return nil +} diff --git a/storage/sql.go b/storage/sql.go index 90d85217d..d907e5612 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -2,7 +2,6 @@ package storage import ( "context" - "github.com/filecoin-project/sentinel-visor/model/actors/power" "time" "github.com/go-pg/pg/v10" @@ -12,6 +11,8 @@ import ( "github.com/filecoin-project/sentinel-visor/model/actors/market" "github.com/filecoin-project/sentinel-visor/model/actors/miner" + "github.com/filecoin-project/sentinel-visor/model/actors/power" + "github.com/filecoin-project/sentinel-visor/model/actors/reward" "github.com/filecoin-project/sentinel-visor/model/blocks" "github.com/filecoin-project/sentinel-visor/model/messages" ) @@ -38,6 +39,7 @@ var models = []interface{}{ (*messages.Receipt)(nil), (*power.ChainPower)(nil), + (*reward.ChainReward)(nil), } func NewDatabase(ctx context.Context, url string) (*Database, error) {