From 627e829de22b6596605d47195c42f1cdaa1f0169 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Thu, 3 Dec 2020 13:51:38 +0100 Subject: [PATCH] feat(gas outputs): Add Height and ActorName (#270) * Feat(gas outputs): Add Height and ActorName Gas outputs are very important for many base-fee and gas-usage related views. The table has no height, which means it needs to be joined with blocks all the time on something like state_root to get a timestamp. This is likely innefficient. Additionally, it needs to be joined with actors to get the actor code. Since we are "deriving" and already include redundant info, we might also get this one. This allows easy filtering of messages by actor type and method. * feat(gas outputs): Migrations: add height and actor_name columns * Migration 22: set height to bigint * feat(migrations): add constrains to new derived_gas_outputs columns * feat(migrations): add height column as primary key in derived_gas_outputs. * feat(migrations): re-index gasoutputs_height_code migration to 24. * feat(migrations): fix gas_output_migration. * Add state_root to derived_gas_outputs index Co-authored-by: Ian Davis --- chain/message.go | 33 +++++++------- model/derived/gasoutputs.go | 9 ++-- .../migrations/25_gasoutputs_height_code.go | 43 +++++++++++++++++++ storage/sql.go | 8 ++-- tasks/message/gasoutputs.go | 41 +++++++++++++++++- 5 files changed, 105 insertions(+), 29 deletions(-) create mode 100644 storage/migrations/25_gasoutputs_height_code.go diff --git a/chain/message.go b/chain/message.go index 6ab8254e7..c6b43e752 100644 --- a/chain/message.go +++ b/chain/message.go @@ -21,6 +21,7 @@ import ( derivedmodel "github.com/filecoin-project/sentinel-visor/model/derived" messagemodel "github.com/filecoin-project/sentinel-visor/model/messages" visormodel "github.com/filecoin-project/sentinel-visor/model/visor" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" ) type MessageProcessor struct { @@ -164,23 +165,20 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts outputs := p.node.ComputeGasOutputs(m.Receipt.GasUsed, m.Message.GasLimit, m.BlockHeader.ParentBaseFee, m.Message.GasFeeCap, m.Message.GasPremium) gasOutput := &derivedmodel.GasOutputs{ - // TODO: add Height and ActorName as per https://github.com/filecoin-project/sentinel-visor/pull/270 - // Height: msg.Height, - Cid: msg.Cid, - From: msg.From, - To: msg.To, - Value: msg.Value, - GasFeeCap: msg.GasFeeCap, - GasPremium: msg.GasPremium, - GasLimit: msg.GasLimit, - Nonce: msg.Nonce, - Method: msg.Method, - StateRoot: m.BlockHeader.ParentStateRoot.String(), - ExitCode: rcpt.ExitCode, - GasUsed: rcpt.GasUsed, - ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), - - // TODO: is SizeBytes really needed here? + Height: msg.Height, + Cid: msg.Cid, + From: msg.From, + To: msg.To, + Value: msg.Value, + GasFeeCap: msg.GasFeeCap, + GasPremium: msg.GasPremium, + GasLimit: msg.GasLimit, + Nonce: msg.Nonce, + Method: msg.Method, + StateRoot: m.BlockHeader.ParentStateRoot.String(), + ExitCode: rcpt.ExitCode, + GasUsed: rcpt.GasUsed, + ParentBaseFee: m.BlockHeader.ParentBaseFee.String(), SizeBytes: msgSize, BaseFeeBurn: outputs.BaseFeeBurn.String(), OverEstimationBurn: outputs.OverEstimationBurn.String(), @@ -189,6 +187,7 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts Refund: outputs.Refund.String(), GasRefund: outputs.GasRefund, GasBurned: outputs.GasBurned, + ActorName: actorstate.ActorNameByCode(m.ToActorCode), } gasOutputsResults = append(gasOutputsResults, gasOutput) diff --git a/model/derived/gasoutputs.go b/model/derived/gasoutputs.go index 2e3af8056..a910ed3f0 100644 --- a/model/derived/gasoutputs.go +++ b/model/derived/gasoutputs.go @@ -12,7 +12,9 @@ import ( type GasOutputs struct { tableName struct{} `pg:"derived_gas_outputs"` //nolint: structcheck,unused + Height int64 `pg:",pk,use_zero,notnull"` Cid string `pg:",pk,notnull"` + StateRoot string `pg:",pk,notnull"` From string `pg:",notnull"` To string `pg:",notnull"` Value string `pg:",notnull"` @@ -22,7 +24,7 @@ type GasOutputs struct { SizeBytes int `pg:",use_zero,notnull"` Nonce uint64 `pg:",use_zero,notnull"` Method uint64 `pg:",use_zero,notnull"` - StateRoot string `pg:",notnull"` + ActorName string `pg:",notnull"` ExitCode int64 `pg:",use_zero,notnull"` GasUsed int64 `pg:",use_zero,notnull"` ParentBaseFee string `pg:",notnull"` @@ -65,8 +67,3 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { } return nil } - -type ProcessingGasOutputs struct { - Height int64 - GasOutputs -} diff --git a/storage/migrations/25_gasoutputs_height_code.go b/storage/migrations/25_gasoutputs_height_code.go new file mode 100644 index 000000000..acf691e40 --- /dev/null +++ b/storage/migrations/25_gasoutputs_height_code.go @@ -0,0 +1,43 @@ +package migrations + +import "github.com/go-pg/migrations/v8" + +// Schema version 25 adds Height and ActorName to gas outputs table + +func init() { + up := batch(` + + DO $$ + BEGIN + IF ( + SELECT count(*) + FROM information_schema.constraint_column_usage + WHERE table_schema = 'public' + AND table_name = 'derived_gas_outputs' + AND constraint_name='derived_gas_outputs_pkey' + AND column_name IN ('height','cid','state_root') + ) != 3 -- want all three columns in the index + THEN + -- Can't change primary key while data exists + TRUNCATE TABLE public.derived_gas_outputs; + + ALTER TABLE public.derived_gas_outputs ADD COLUMN height bigint NOT NULL; + ALTER TABLE public.derived_gas_outputs ADD COLUMN actor_name text NOT NULL; + ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey; + ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (height,cid,state_root); + END IF; + END + $$; + + +`) + down := batch(` + TRUNCATE TABLE public.derived_gas_outputs; + ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey; + ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (cid); + ALTER TABLE public.derived_gas_outputs DROP COLUMN height; + ALTER TABLE public.derived_gas_outputs DROP COLUMN actor_name; +`) + + migrations.MustRegisterTx(up, down) +} diff --git a/storage/sql.go b/storage/sql.go index 18e0a76cd..53d23147b 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -540,10 +540,10 @@ func useNullIfEmpty(s string) *string { } // LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process. -func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) { +func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) { stop := metrics.Timer(ctx, metrics.BatchSelectionDuration) defer stop() - var list []*derived.ProcessingGasOutputs + var list []*derived.GasOutputs if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.QueryContext(ctx, &list, ` @@ -583,11 +583,11 @@ SELECT * FROM leased; } // FindGasOutputsMessages finds a set of messages that have receipts for gas output processing but does not take a lease out. minHeight and maxHeight define an inclusive range of heights to process. -func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) { +func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) { stop := metrics.Timer(ctx, metrics.BatchSelectionDuration) defer stop() - var list []*derived.ProcessingGasOutputs + var list []*derived.GasOutputs if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.QueryContext(ctx, &list, ` diff --git a/tasks/message/gasoutputs.go b/tasks/message/gasoutputs.go index 7b4a7560c..4bbfb8900 100644 --- a/tasks/message/gasoutputs.go +++ b/tasks/message/gasoutputs.go @@ -2,10 +2,16 @@ package message import ( "context" + "errors" "time" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/chain/state" + "github.com/filecoin-project/lotus/chain/types" "github.com/go-pg/pg/v10" + "github.com/ipfs/go-cid" "github.com/raulk/clock" "go.opencensus.io/tag" "go.opentelemetry.io/otel/api/global" @@ -15,6 +21,7 @@ import ( "github.com/filecoin-project/sentinel-visor/metrics" "github.com/filecoin-project/sentinel-visor/model/derived" "github.com/filecoin-project/sentinel-visor/storage" + "github.com/filecoin-project/sentinel-visor/tasks/actorstate" "github.com/filecoin-project/sentinel-visor/wait" ) @@ -65,7 +72,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) ( claimUntil := p.clock.Now().Add(p.leaseLength) - var batch []*derived.ProcessingGasOutputs + var batch []*derived.GasOutputs var err error if p.useLeases { @@ -104,7 +111,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) ( errorLog := log.With("cid", item.Cid) - if err := p.processItem(ctx, node, &item.GasOutputs); err != nil { + if err := p.processItem(ctx, node, item); err != nil { // Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch errorLog.Errorw("failed to process message", "error", err.Error()) if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil { @@ -125,6 +132,35 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it stop := metrics.Timer(ctx, metrics.ProcessingDuration) defer stop() + // Note: this item will only be processed if there are receipts for + // it, which means there should be a tipset at height+1. This is only + // used to get the destination actor code, so we don't care about side + // chains. + child, err := node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(item.Height+1), types.NewTipSetKey()) + if err != nil { + return xerrors.Errorf("Failed to load child tipset: %w", err) + } + + st, err := state.LoadStateTree(node.Store(), child.ParentState()) + if err != nil { + return xerrors.Errorf("load state tree when gas outputs for %s: %w", item.Cid, err) + } + + dstAddr, err := address.NewFromString(item.To) + if err != nil { + return xerrors.Errorf("parse to address failed for gas outputs in %s: %w", item.Cid, err) + } + + var dstActorCode cid.Cid + dstActor, err := st.GetActor(dstAddr) + if err != nil { + if !errors.Is(err, types.ErrActorNotFound) { + return xerrors.Errorf("get destination actor for gas outputs %s failed: %w", item.Cid, err) + } + } else { + dstActorCode = dstActor.Code + } + baseFee, err := big.FromString(item.ParentBaseFee) if err != nil { return xerrors.Errorf("parse fee cap: %w", err) @@ -144,6 +180,7 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it outputs := node.ComputeGasOutputs(item.GasUsed, item.GasLimit, baseFee, feeCap, gasPremium) cgoStop() + item.ActorName = actorstate.ActorNameByCode(dstActorCode) item.BaseFeeBurn = outputs.BaseFeeBurn.String() item.OverEstimationBurn = outputs.OverEstimationBurn.String() item.MinerPenalty = outputs.MinerPenalty.String()