diff --git a/chain/message.go b/chain/message.go index 6ab8254e7..c6b43e752 100644 --- a/chain/message.go +++ b/chain/message.go @@ -21,6 +21,7 @@ import ( derivedmodel "github.com/filecoin-project/sentinel-visor/model/derived" messagemodel "github.com/filecoin-project/sentinel-visor/model/messages" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" ) type MessageProcessor struct { @@ -164,23 +165,20 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts outputs := p.node.ComputeGasOutputs(m.Receipt.GasUsed, m.Message.GasLimit, m.BlockHeader.ParentBaseFee, m.Message.GasFeeCap, m.Message.GasPremium) gasOutput := &derivedmodel.GasOutputs{ - // TODO: add Height and ActorName as per https://github.com/filecoin-project/sentinel-visor/pull/270 - // Height: msg.Height, - Cid: msg.Cid, - From: msg.From, - To: msg.To, - Value: msg.Value, - GasFeeCap: msg.GasFeeCap, - GasPremium: msg.GasPremium, - GasLimit: msg.GasLimit, - Nonce: msg.Nonce, - Method: msg.Method, - StateRoot: m.BlockHeader.ParentStateRoot.String(), - ExitCode: rcpt.ExitCode, - GasUsed: rcpt.GasUsed, - ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), - - // TODO: is SizeBytes really needed here? + Height: msg.Height, + Cid: msg.Cid, + From: msg.From, + To: msg.To, + Value: msg.Value, + GasFeeCap: msg.GasFeeCap, + GasPremium: msg.GasPremium, + GasLimit: msg.GasLimit, + Nonce: msg.Nonce, + Method: msg.Method, + StateRoot: m.BlockHeader.ParentStateRoot.String(), + ExitCode: rcpt.ExitCode, + GasUsed: rcpt.GasUsed, + ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), SizeBytes: msgSize, BaseFeeBurn: outputs.BaseFeeBurn.String(), OverEstimationBurn: outputs.OverEstimationBurn.String(), @@ -189,6 +187,7 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts Refund: outputs.Refund.String(), GasRefund: outputs.GasRefund, GasBurned: outputs.GasBurned, + ActorName: actorstate.ActorNameByCode(m.ToActorCode), } gasOutputsResults = append(gasOutputsResults, gasOutput) diff --git a/model/derived/gasoutputs.go b/model/derived/gasoutputs.go index 2e3af8056..a910ed3f0 100644 --- a/model/derived/gasoutputs.go +++ b/model/derived/gasoutputs.go @@ -12,7 +12,9 @@ import ( type GasOutputs struct { tableName struct{} `pg:"derived_gas_outputs"` //nolint: structcheck,unused + Height int64 `pg:",pk,use_zero,notnull"` Cid string `pg:",pk,notnull"` + StateRoot string `pg:",pk,notnull"` From string `pg:",notnull"` To string `pg:",notnull"` Value string `pg:",notnull"` @@ -22,7 +24,7 @@ type GasOutputs struct { SizeBytes int `pg:",use_zero,notnull"` Nonce uint64 `pg:",use_zero,notnull"` Method uint64 `pg:",use_zero,notnull"` - StateRoot string `pg:",notnull"` + ActorName string `pg:",notnull"` ExitCode int64 `pg:",use_zero,notnull"` GasUsed int64 `pg:",use_zero,notnull"` ParentBaseFee string `pg:",notnull"` @@ -65,8 +67,3 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { } return nil } - -type ProcessingGasOutputs struct { - Height int64 - GasOutputs -} diff --git a/storage/migrations/25_gasoutputs_height_code.go b/storage/migrations/25_gasoutputs_height_code.go new file mode 100644 index 000000000..acf691e40 --- /dev/null +++ b/storage/migrations/25_gasoutputs_height_code.go @@ -0,0 +1,43 @@ +package migrations + +import "github.com/go-pg/migrations/v8" + +// Schema version 25 adds Height and ActorName to gas outputs table + +func init() { + up := batch(` + + DO $$ + BEGIN + IF ( + SELECT count(*) + FROM information_schema.constraint_column_usage + WHERE table_schema = 'public' + AND table_name = 'derived_gas_outputs' + AND constraint_name='derived_gas_outputs_pkey' + AND column_name IN ('height','cid','state_root') + ) != 3 -- want all three columns in the index + THEN + -- Can't change primary key while data exists + TRUNCATE TABLE public.derived_gas_outputs; + + ALTER TABLE public.derived_gas_outputs ADD COLUMN height bigint NOT NULL; + ALTER TABLE public.derived_gas_outputs ADD COLUMN actor_name text NOT NULL; + ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey; + ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (height,cid,state_root); + END IF; + END + $$; + + +`) + down := batch(` + TRUNCATE TABLE public.derived_gas_outputs; + ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey; + ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (cid); + ALTER TABLE public.derived_gas_outputs DROP COLUMN height; + ALTER TABLE public.derived_gas_outputs DROP COLUMN actor_name; +`) + + migrations.MustRegisterTx(up, down) +} diff --git a/storage/sql.go b/storage/sql.go index 18e0a76cd..53d23147b 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -540,10 +540,10 @@ func useNullIfEmpty(s string) *string { } // LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process. -func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) { +func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) { stop := metrics.Timer(ctx, metrics.BatchSelectionDuration) defer stop() - var list []*derived.ProcessingGasOutputs + var list []*derived.GasOutputs if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.QueryContext(ctx, &list, ` @@ -583,11 +583,11 @@ SELECT * FROM leased; } // FindGasOutputsMessages finds a set of messages that have receipts for gas output processing but does not take a lease out. minHeight and maxHeight define an inclusive range of heights to process. -func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) { +func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) { stop := metrics.Timer(ctx, metrics.BatchSelectionDuration) defer stop() - var list []*derived.ProcessingGasOutputs + var list []*derived.GasOutputs if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.QueryContext(ctx, &list, ` diff --git a/tasks/message/gasoutputs.go b/tasks/message/gasoutputs.go index 7b4a7560c..4bbfb8900 100644 --- a/tasks/message/gasoutputs.go +++ b/tasks/message/gasoutputs.go @@ -2,10 +2,16 @@ package message import ( "context" + "errors" "time" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/types" "github.com/go-pg/pg/v10" + "github.com/ipfs/go-cid" "github.com/raulk/clock" "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" @@ -15,6 +21,7 @@ import ( "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model/derived" "github.com/filecoin-project/sentinel-visor/storage" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" "github.com/filecoin-project/sentinel-visor/wait" ) @@ -65,7 +72,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) ( claimUntil := p.clock.Now().Add(p.leaseLength) - var batch []*derived.ProcessingGasOutputs + var batch []*derived.GasOutputs var err error if p.useLeases { @@ -104,7 +111,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) ( errorLog := log.With("cid", item.Cid) - if err := p.processItem(ctx, node, &item.GasOutputs); err != nil { + if err := p.processItem(ctx, node, item); err != nil { // Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch errorLog.Errorw("failed to process message", "error", err.Error()) if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil { @@ -125,6 +132,35 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it stop := metrics.Timer(ctx, metrics.ProcessingDuration) defer stop() + // Note: this item will only be processed if there are receipts for + // it, which means there should be a tipset at height+1. This is only + // used to get the destination actor code, so we don't care about side + // chains. + child, err := node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(item.Height+1), types.NewTipSetKey()) + if err != nil { + return xerrors.Errorf("Failed to load child tipset: %w", err) + } + + st, err := state.LoadStateTree(node.Store(), child.ParentState()) + if err != nil { + return xerrors.Errorf("load state tree when gas outputs for %s: %w", item.Cid, err) + } + + dstAddr, err := address.NewFromString(item.To) + if err != nil { + return xerrors.Errorf("parse to address failed for gas outputs in %s: %w", item.Cid, err) + } + + var dstActorCode cid.Cid + dstActor, err := st.GetActor(dstAddr) + if err != nil { + if !errors.Is(err, types.ErrActorNotFound) { + return xerrors.Errorf("get destination actor for gas outputs %s failed: %w", item.Cid, err) + } + } else { + dstActorCode = dstActor.Code + } + baseFee, err := big.FromString(item.ParentBaseFee) if err != nil { return xerrors.Errorf("parse fee cap: %w", err) @@ -144,6 +180,7 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it outputs := node.ComputeGasOutputs(item.GasUsed, item.GasLimit, baseFee, feeCap, gasPremium) cgoStop() + item.ActorName = actorstate.ActorNameByCode(dstActorCode) item.BaseFeeBurn = outputs.BaseFeeBurn.String() item.OverEstimationBurn = outputs.OverEstimationBurn.String() item.MinerPenalty = outputs.MinerPenalty.String()