From c5f66b5dca18b34e0437ed5f184484c9e9083211 Mon Sep 17 00:00:00 2001 From: Frrist Date: Mon, 21 Sep 2020 02:38:58 -0700 Subject: [PATCH] feat(task): add actor, actor-state, and init actor processing (#14) * feat(task): add actor and actor state processing * feat(task): add init actor processing --- go.mod | 1 + model/actors/common/actors.go | 40 ++++ model/actors/common/task.go | 23 +++ model/actors/init/idaddress.go | 38 ++++ model/genesis/task.go | 22 ++- services/processor/processor.go | 2 +- services/processor/scheduler.go | 80 +++++++- services/processor/tasks/common/actor.go | 193 ++++++++++++++++++++ services/processor/tasks/genesis/genesis.go | 52 +++++- services/processor/tasks/init/init_actor.go | 145 +++++++++++++++ services/processor/tasks/market/market.go | 2 +- storage/sql.go | 6 + 12 files changed, 590 insertions(+), 14 deletions(-) create mode 100644 model/actors/common/actors.go create mode 100644 model/actors/common/task.go create mode 100644 model/actors/init/idaddress.go create mode 100644 services/processor/tasks/common/actor.go create mode 100644 services/processor/tasks/init/init_actor.go diff --git a/go.mod b/go.mod index 01a2303b7..f3ff89547 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/filecoin-project/go-address v0.0.3 github.com/filecoin-project/go-bitfield v0.2.0 github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200822201400-474f4fdccc52 + github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df github.com/filecoin-project/lotus v0.6.1 github.com/filecoin-project/specs-actors v0.9.7 github.com/go-pg/pg/v10 v10.0.7 diff --git a/model/actors/common/actors.go b/model/actors/common/actors.go new file mode 100644 index 000000000..490bf1d8b --- /dev/null +++ b/model/actors/common/actors.go @@ -0,0 +1,40 @@ +package common + +import ( + "context" + "github.com/go-pg/pg/v10" +) + +type Actor struct { + ID string `pg:",pk,notnull"` + StateRoot string `pg:",pk,notnull"` + Code string `pg:",notnull"` + Head string `pg:",notnull"` + Balance string `pg:",notnull"` + Nonce uint64 `pg:",use_zero"` +} + +func (a *Actor) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, a). + OnConflict("do nothing"). + Insert(); err != nil { + return err + } + return nil +} + +type ActorState struct { + Head string `pg:",pk,notnull"` + Code string `pg:",pk,notnull"` + State string `pg:",notnull"` +} + +func (s *ActorState) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, s). + OnConflict("do nothing"). + Insert(); err != nil { + return err + } + return nil + +} diff --git a/model/actors/common/task.go b/model/actors/common/task.go new file mode 100644 index 000000000..605d558dc --- /dev/null +++ b/model/actors/common/task.go @@ -0,0 +1,23 @@ +package common + +import ( + "context" + "github.com/go-pg/pg/v10" +) + +type ActorTaskResult struct { + Actor *Actor + State *ActorState +} + +func (a *ActorTaskResult) Persist(ctx context.Context, db *pg.DB) error { + return db.RunInTransaction(ctx, func(tx *pg.Tx) error { + if err := a.Actor.PersistWithTx(ctx, tx); err != nil { + return err + } + if err := a.State.PersistWithTx(ctx, tx); err != nil { + return err + } + return nil + }) +} diff --git a/model/actors/init/idaddress.go b/model/actors/init/idaddress.go new file mode 100644 index 000000000..e6a9ad96a --- /dev/null +++ b/model/actors/init/idaddress.go @@ -0,0 +1,38 @@ +package init + +import ( + "context" + "github.com/go-pg/pg/v10" +) + +type IdAddress struct { + ID string `pg:",pk,notnull"` + Address string `pg:",pk,notnull"` + StateRoot string `pg:",pk,notnull"` +} + +func (ia *IdAddress) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + if _, err := tx.ModelContext(ctx, ia). + OnConflict("do nothing"). + Insert(); err != nil { + return err + } + return nil +} + +type IdAddressList []*IdAddress + +func (ias IdAddressList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { + for _, ia := range ias { + if err := ia.PersistWithTx(ctx, tx); err != nil { + return err + } + } + return nil +} + +func (ias IdAddressList) Persist(ctx context.Context, db *pg.DB) error { + return db.RunInTransaction(ctx, func(tx *pg.Tx) error { + return ias.PersistWithTx(ctx, tx) + }) +} diff --git a/model/genesis/task.go b/model/genesis/task.go index 58d11711f..84e8a3c31 100644 --- a/model/genesis/task.go +++ b/model/genesis/task.go @@ -3,6 +3,7 @@ package genesis import ( "context" + init_ "github.com/filecoin-project/sentinel-visor/model/actors/init" "github.com/filecoin-project/sentinel-visor/model/actors/market" "github.com/filecoin-project/sentinel-visor/model/actors/miner" "github.com/go-pg/pg/v10" @@ -10,8 +11,9 @@ import ( ) type ProcessGenesisSingletonResult struct { - minerResults []*GenesisMinerTaskResult - marketResult *GenesisMarketTaskResult + minerResults []*GenesisMinerTaskResult + marketResult *GenesisMarketTaskResult + initActorResult *GenesisInitActorTaskResult } func (r *ProcessGenesisSingletonResult) Persist(ctx context.Context, db *pg.DB) error { @@ -40,6 +42,11 @@ func (r *ProcessGenesisSingletonResult) Persist(ctx context.Context, db *pg.DB) return err } } + if r.initActorResult != nil { + if err := r.initActorResult.AddressMap.PersistWithTx(ctx, tx); err != nil { + return err + } + } return nil }) } @@ -55,6 +62,13 @@ func (r *ProcessGenesisSingletonResult) SetMarket(m *GenesisMarketTaskResult) { r.marketResult = m } +func (r *ProcessGenesisSingletonResult) SetInitActor(m *GenesisInitActorTaskResult) { + if r.initActorResult != nil { + panic("Genesis InitActor State already set, developer error!!!") + } + r.initActorResult = m +} + type GenesisMinerTaskResult struct { StateModel *miner.MinerState PowerModel *miner.MinerPower @@ -66,3 +80,7 @@ type GenesisMarketTaskResult struct { DealModels market.MarketDealStates ProposalModesl market.MarketDealProposals } + +type GenesisInitActorTaskResult struct { + AddressMap init_.IdAddressList +} diff --git a/services/processor/processor.go b/services/processor/processor.go index 41ba05c4f..20bb28554 100644 --- a/services/processor/processor.go +++ b/services/processor/processor.go @@ -2,7 +2,6 @@ package processor import ( "context" - "github.com/filecoin-project/lotus/lib/parmap" "strings" "sync" "time" @@ -15,6 +14,7 @@ import ( "github.com/filecoin-project/go-address" types "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/lib/parmap" "github.com/filecoin-project/sentinel-visor/storage" "github.com/filecoin-project/sentinel-visor/lens" diff --git a/services/processor/scheduler.go b/services/processor/scheduler.go index b82083a8f..b60de12b9 100644 --- a/services/processor/scheduler.go +++ b/services/processor/scheduler.go @@ -4,6 +4,7 @@ import ( "github.com/gocraft/work" "github.com/gomodule/redigo/redis" "github.com/ipfs/go-cid" + "strconv" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/builtin" @@ -11,7 +12,9 @@ import ( "github.com/filecoin-project/sentinel-visor/lens" "github.com/filecoin-project/sentinel-visor/model" "github.com/filecoin-project/sentinel-visor/services/indexer" + "github.com/filecoin-project/sentinel-visor/services/processor/tasks/common" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/genesis" + init_ "github.com/filecoin-project/sentinel-visor/services/processor/tasks/init" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/market" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/message" "github.com/filecoin-project/sentinel-visor/services/processor/tasks/miner" @@ -21,7 +24,10 @@ import ( const ( GenesisTaskName = "process_genesis" - GensisPoolName = "genesis_tasks" + GenesisPoolName = "genesis_tasks" + + InitActorTaskName = "process_init_actor" + InitActorPoolName = "init_actor_tasks" MinerTaskName = "process_miner" MinerPoolName = "miner_actor_tasks" @@ -37,6 +43,9 @@ const ( RewardTaskName = "process_reward" RewardPoolName = "reward_tasks" + + CommonTaskName = "process_common" + CommonPoolName = "common_actor_tasks" ) // Make a redis pool @@ -50,22 +59,26 @@ var redisPool = &redis.Pool{ } func NewScheduler(node lens.API, pubCh chan<- model.Persistable) *Scheduler { - genesisPool, genesisQueue := genesis.Setup(1, GenesisTaskName, GensisPoolName, redisPool, node, pubCh) + genesisPool, genesisQueue := genesis.Setup(1, GenesisTaskName, GenesisPoolName, redisPool, node, pubCh) minerPool, minerQueue := miner.Setup(64, MinerTaskName, MinerPoolName, redisPool, node, pubCh) marketPool, marketQueue := market.Setup(64, MarketTaskName, MarketPoolName, redisPool, node, pubCh) msgPool, msgQueue := message.Setup(64, MessageTaskName, MessagePoolName, redisPool, node, pubCh) powerPool, powerQueue := power.Setup(64, PowerTaskName, PowerPoolName, redisPool, node, pubCh) rwdPool, rwdQueue := reward.Setup(4, RewardTaskName, RewardPoolName, redisPool, node, pubCh) + comPool, comQueue := common.Setup(64, CommonTaskName, CommonPoolName, redisPool, node, pubCh) + initPool, initQueue := init_.Setup(64, InitActorTaskName, InitActorPoolName, redisPool, node, pubCh) - pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool} + pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool, comPool, initPool} queues := map[string]*work.Enqueuer{ - GenesisTaskName: genesisQueue, - MinerTaskName: minerQueue, - MarketTaskName: marketQueue, - MessageTaskName: msgQueue, - PowerTaskName: powerQueue, - RewardTaskName: rwdQueue, + GenesisTaskName: genesisQueue, + MinerTaskName: minerQueue, + MarketTaskName: marketQueue, + MessageTaskName: msgQueue, + PowerTaskName: powerQueue, + RewardTaskName: rwdQueue, + CommonTaskName: comQueue, + InitActorTaskName: initQueue, } return &Scheduler{ @@ -102,7 +115,16 @@ func (s *Scheduler) Dispatch(tips indexer.ActorTips) error { return err } for _, actor := range actors { + _, err := s.queueCommonTask(actor) + if err != nil { + return err + } switch actor.Actor.Code { + case builtin.InitActorCodeID: + _, err := s.queueInitTask(actor) + if err != nil { + return err + } case builtin.StorageMinerActorCodeID: _, err := s.queueMinerTask(actor) if err != nil { @@ -182,6 +204,24 @@ func (s *Scheduler) queueMarketTask(info indexer.ActorInfo) (*work.Job, error) { "address": info.Address.String(), "stateroot": info.ParentStateRoot.String(), }) +} + +func (s *Scheduler) queueInitTask(info indexer.ActorInfo) (*work.Job, error) { + tsB, err := info.TipSet.MarshalJSON() + if err != nil { + return nil, err + } + ptsB, err := info.ParentTipSet.MarshalJSON() + if err != nil { + return nil, err + } + return s.queues[InitActorTaskName].Enqueue(InitActorTaskName, work.Q{ + "ts": string(tsB), + "pts": string(ptsB), + "head": info.Actor.Head.String(), + "address": info.Address.String(), + "stateroot": info.ParentStateRoot.String(), + }) } @@ -216,11 +256,33 @@ func (s *Scheduler) queueRewardTask(info indexer.ActorInfo) (*work.Job, error) { if err != nil { return nil, err } + return s.queues[RewardTaskName].Enqueue(RewardTaskName, work.Q{ "ts": string(tsB), "pts": string(ptsB), "head": info.Actor.Head.String(), "stateroot": info.ParentStateRoot.String(), }) +} +func (s *Scheduler) queueCommonTask(info indexer.ActorInfo) (*work.Job, error) { + tsB, err := info.TipSet.MarshalJSON() + if err != nil { + return nil, err + } + ptsB, err := info.ParentTipSet.MarshalJSON() + if err != nil { + return nil, err + } + + return s.queues[CommonTaskName].Enqueue(CommonTaskName, work.Q{ + "ts": string(tsB), + "pts": string(ptsB), + "stateroot": info.ParentStateRoot.String(), + "address": info.Address.String(), + "head": info.Actor.Head.String(), + "code": info.Actor.Code.String(), + "balance": info.Actor.Balance.String(), + "nonce": strconv.FormatUint(info.Actor.Nonce, 10), + }) } diff --git a/services/processor/tasks/common/actor.go b/services/processor/tasks/common/actor.go new file mode 100644 index 000000000..6bc3dc0b3 --- /dev/null +++ b/services/processor/tasks/common/actor.go @@ -0,0 +1,193 @@ +package common + +import ( + "context" + "encoding/json" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/big" + lapi "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + "github.com/filecoin-project/specs-actors/actors/builtin" + "github.com/gocraft/work" + "github.com/gomodule/redigo/redis" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "strconv" + + commonmodel "github.com/filecoin-project/sentinel-visor/model/actors/common" +) + +func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { + pool := work.NewWorkerPool(ProcessActorTask{}, concurrency, poolName, redisPool) + queue := work.NewEnqueuer(poolName, redisPool) + + // https://github.com/gocraft/work/issues/10#issuecomment-237580604 + // adding fields via a closure gives the workers access to the lotus api, a global could also be used here + pool.Middleware(func(mt *ProcessActorTask, job *work.Job, next work.NextMiddlewareFunc) error { + mt.node = node + mt.pubCh = pubCh + mt.log = logging.Logger("commonactortask") + return next() + }) + logging.SetLogLevel("commonactortask", "info") + // log all task + pool.Middleware((*ProcessActorTask).Log) + + // register task method and don't allow retying + pool.JobWithOptions(taskName, work.JobOptions{ + MaxFails: 1, + }, (*ProcessActorTask).Task) + + return pool, queue +} + +type ProcessActorTask struct { + node lapi.FullNode + log *logging.ZapEventLogger + + pubCh chan<- model.Persistable + + tsKey types.TipSetKey + ptsKey types.TipSetKey + stateroot cid.Cid + addr address.Address + head cid.Cid + code cid.Cid + balance big.Int + nonce uint64 +} + +func (p *ProcessActorTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { + p.log.Infow("starting common actor task", "job", job.ID, "args", job.Args) + return next() +} + +func (p *ProcessActorTask) ParseArgs(job *work.Job) error { + // this needs a better pattern.... + tsStr := job.ArgString("ts") + if err := job.ArgError(); err != nil { + return err + } + + ptsStr := job.ArgString("pts") + if err := job.ArgError(); err != nil { + return err + } + + srStr := job.ArgString("stateroot") + if err := job.ArgError(); err != nil { + return err + } + + addrStr := job.ArgString("address") + if err := job.ArgError(); err != nil { + return err + } + + headStr := job.ArgString("head") + if err := job.ArgError(); err != nil { + return err + } + + codeStr := job.ArgString("code") + if err := job.ArgError(); err != nil { + return err + } + + balStr := job.ArgString("balance") + if err := job.ArgError(); err != nil { + return err + } + + nonceStr := job.ArgString("nonce") + if err := job.ArgError(); err != nil { + return err + } + + var tsKey types.TipSetKey + if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { + return err + } + + var ptsKey types.TipSetKey + if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { + return err + } + + stateroot, err := cid.Decode(srStr) + if err != nil { + return err + } + + addr, err := address.NewFromString(addrStr) + if err != nil { + return err + } + + head, err := cid.Decode(headStr) + if err != nil { + return err + } + + code, err := cid.Decode(codeStr) + if err != nil { + return err + } + + balance, err := big.FromString(balStr) + if err != nil { + return err + } + + nonce, err := strconv.ParseUint(nonceStr, 10, 64) + if err != nil { + return err + } + + p.tsKey = tsKey + p.ptsKey = ptsKey + p.stateroot = stateroot + p.addr = addr + p.head = head + p.code = code + p.balance = balance + p.nonce = nonce + return nil +} + +func (p *ProcessActorTask) Task(job *work.Job) error { + if err := p.ParseArgs(job); err != nil { + return err + } + + ctx := context.TODO() + + ast, err := p.node.StateReadState(ctx, p.addr, p.tsKey) + if err != nil { + return err + } + + state, err := json.Marshal(ast.State) + if err != nil { + return err + } + + p.pubCh <- &commonmodel.ActorTaskResult{ + Actor: &commonmodel.Actor{ + ID: p.addr.String(), + StateRoot: p.stateroot.String(), + Code: builtin.ActorNameByCode(p.code), + Head: p.head.String(), + Balance: p.balance.String(), + Nonce: p.nonce, + }, + State: &commonmodel.ActorState{ + Head: p.head.String(), + Code: p.code.String(), + State: string(state), + }, + } + return nil +} diff --git a/services/processor/tasks/genesis/genesis.go b/services/processor/tasks/genesis/genesis.go index 61d897121..3516f75e9 100644 --- a/services/processor/tasks/genesis/genesis.go +++ b/services/processor/tasks/genesis/genesis.go @@ -3,6 +3,9 @@ package genesis import ( "bytes" "context" + initmodel "github.com/filecoin-project/sentinel-visor/model/actors/init" + "github.com/filecoin-project/specs-actors/actors/util/adt" + typegen "github.com/whyrusleeping/cbor-gen" "strconv" "github.com/gocraft/work" @@ -13,6 +16,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/specs-actors/actors/builtin" + init_ "github.com/filecoin-project/specs-actors/actors/builtin/init" "github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/sentinel-visor/lens" @@ -107,7 +111,11 @@ func (p *ProcessGenesisSingletonTask) Task(job *work.Job) error { case builtin.SystemActorCodeID: // TODO case builtin.InitActorCodeID: - // TODO + res, err := p.initActorState(ctx) + if err != nil { + return err + } + result.SetInitActor(res) case builtin.CronActorCodeID: // TODO case builtin.AccountActorCodeID: @@ -213,6 +221,48 @@ func (p *ProcessGenesisSingletonTask) storageMinerState(ctx context.Context, add }, nil } +func (p *ProcessGenesisSingletonTask) initActorState(ctx context.Context) (*genesismodel.GenesisInitActorTaskResult, error) { + initActor, err := p.node.StateGetActor(ctx, builtin.InitActorAddr, p.genesis) + if err != nil { + return nil, err + } + initActorRaw, err := p.node.ChainReadObj(ctx, initActor.Head) + if err != nil { + return nil, err + } + var initActorState init_.State + if err := initActorState.UnmarshalCBOR(bytes.NewReader(initActorRaw)); err != nil { + return nil, err + } + + addrMap, err := adt.AsMap(p.node.Store(), initActorState.AddressMap) + if err != nil { + return nil, err + } + + out := initmodel.IdAddressList{} + var actorID typegen.CborInt + if err := addrMap.ForEach(&actorID, func(key string) error { + longAddr, err := address.NewFromBytes([]byte(key)) + if err != nil { + return err + } + shortAddr, err := address.NewIDAddress(uint64(actorID)) + if err != nil { + return err + } + out = append(out, &initmodel.IdAddress{ + ID: shortAddr.String(), + Address: longAddr.String(), + StateRoot: p.stateroot.String(), + }) + return nil + }); err != nil { + return nil, err + } + return &genesismodel.GenesisInitActorTaskResult{AddressMap: out}, nil +} + func (p *ProcessGenesisSingletonTask) storageMarketState(ctx context.Context) (*genesismodel.GenesisMarketTaskResult, error) { dealStates, err := p.node.StateMarketDeals(ctx, p.genesis) if err != nil { diff --git a/services/processor/tasks/init/init_actor.go b/services/processor/tasks/init/init_actor.go new file mode 100644 index 000000000..8f0f101f2 --- /dev/null +++ b/services/processor/tasks/init/init_actor.go @@ -0,0 +1,145 @@ +package init + +import ( + "context" + "fmt" + + "github.com/filecoin-project/lotus/chain/events/state" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/sentinel-visor/lens" + "github.com/filecoin-project/sentinel-visor/model" + initmodel "github.com/filecoin-project/sentinel-visor/model/actors/init" + "github.com/gocraft/work" + "github.com/gomodule/redigo/redis" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" +) + +func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { + pool := work.NewWorkerPool(ProcessInitActorTask{}, concurrency, poolName, redisPool) + queue := work.NewEnqueuer(poolName, redisPool) + + // https://github.com/gocraft/work/issues/10#issuecomment-237580604 + // adding fields via a closure gives the workers access to the lotus api, a global could also be used here + pool.Middleware(func(mt *ProcessInitActorTask, job *work.Job, next work.NextMiddlewareFunc) error { + mt.node = node + mt.pubCh = pubCh + mt.log = logging.Logger("markettask") + return next() + }) + logging.SetLogLevel("markettask", "info") + // log all task + pool.Middleware((*ProcessInitActorTask).Log) + + // register task method and don't allow retying + pool.JobWithOptions(taskName, work.JobOptions{ + MaxFails: 1, + }, (*ProcessInitActorTask).Task) + + return pool, queue +} + +type ProcessInitActorTask struct { + node lens.API + log *logging.ZapEventLogger + + pubCh chan<- model.Persistable + + head cid.Cid + stateroot cid.Cid + tsKey types.TipSetKey + ptsKey types.TipSetKey +} + +func (p *ProcessInitActorTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { + p.log.Infow("starting init actor task", "job", job.ID, "args", job.Args) + return next() +} + +func (p *ProcessInitActorTask) ParseArgs(job *work.Job) error { + headStr := job.ArgString("head") + if err := job.ArgError(); err != nil { + return err + } + + srStr := job.ArgString("stateroot") + if err := job.ArgError(); err != nil { + return err + } + + tsStr := job.ArgString("ts") + if err := job.ArgError(); err != nil { + return err + } + + ptsStr := job.ArgString("pts") + if err := job.ArgError(); err != nil { + return err + } + + mhead, err := cid.Decode(headStr) + if err != nil { + return err + } + + mstateroot, err := cid.Decode(srStr) + if err != nil { + return err + } + + var tsKey types.TipSetKey + if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { + return err + } + + var ptsKey types.TipSetKey + if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { + return err + } + + p.head = mhead + p.tsKey = tsKey + p.ptsKey = ptsKey + p.stateroot = mstateroot + return nil +} + +func (p *ProcessInitActorTask) Task(job *work.Job) error { + if err := p.ParseArgs(job); err != nil { + return err + } + + ctx := context.TODO() + + pred := state.NewStatePredicates(p.node) + stateDiff := pred.OnInitActorChange(pred.OnAddressMapChange()) + changed, val, err := stateDiff(ctx, p.ptsKey, p.tsKey) + if err != nil { + return err + } + if !changed { + return err + } + changes, ok := val.(*state.InitActorAddressChanges) + if !ok { + return fmt.Errorf("unknown type returned by init acotr hamt predicate: %T", val) + } + + out := make(initmodel.IdAddressList, len(changes.Added)+len(changes.Modified)) + for idx, add := range changes.Added { + out[idx] = &initmodel.IdAddress{ + ID: add.ID.String(), + Address: add.PK.String(), + StateRoot: p.stateroot.String(), + } + } + for idx, mod := range changes.Modified { + out[idx] = &initmodel.IdAddress{ + ID: mod.To.ID.String(), + Address: mod.To.PK.String(), + StateRoot: p.stateroot.String(), + } + } + p.pubCh <- out + return nil +} diff --git a/services/processor/tasks/market/market.go b/services/processor/tasks/market/market.go index cf98f219b..e8ebbc91a 100644 --- a/services/processor/tasks/market/market.go +++ b/services/processor/tasks/market/market.go @@ -26,7 +26,7 @@ func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, n pool.Middleware(func(mt *ProcessMarketTask, job *work.Job, next work.NextMiddlewareFunc) error { mt.node = node mt.pubCh = pubCh - mt.log = logging.Logger("minertask") + mt.log = logging.Logger("markettask") return next() }) logging.SetLogLevel("markettask", "info") diff --git a/storage/sql.go b/storage/sql.go index d907e5612..331470c4f 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -9,6 +9,8 @@ import ( "github.com/go-pg/pgext" "golang.org/x/xerrors" + "github.com/filecoin-project/sentinel-visor/model/actors/common" + init_ "github.com/filecoin-project/sentinel-visor/model/actors/init" "github.com/filecoin-project/sentinel-visor/model/actors/market" "github.com/filecoin-project/sentinel-visor/model/actors/miner" "github.com/filecoin-project/sentinel-visor/model/actors/power" @@ -40,6 +42,10 @@ var models = []interface{}{ (*power.ChainPower)(nil), (*reward.ChainReward)(nil), + (*common.Actor)(nil), + (*common.ActorState)(nil), + + (*init_.IdAddress)(nil), } func NewDatabase(ctx context.Context, url string) (*Database, error) {