Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add new task for fevm receipt #1208

Merged
merged 7 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 46 additions & 34 deletions chain/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,51 +40,32 @@ var (
executedTsCacheSize int
diffPreCommitCacheSize int
diffSectorCacheSize int
actorCacheSize int

tipsetMessageReceiptSizeEnv = "LILY_TIPSET_MSG_RECEIPT_CACHE_SIZE"
executedTsCacheSizeEnv = "LILY_EXECUTED_TS_CACHE_SIZE"
diffPreCommitCacheSizeEnv = "LILY_DIFF_PRECOMMIT_CACHE_SIZE"
diffSectorCacheSizeEnv = "LILY_DIFF_SECTORS_CACHE_SIZE"
actorCacheSizeEnv = "LILY_ACTOR_CACHE_SIZE"
)

func init() {
tipsetMessageReceiptCacheSize = 4
executedTsCacheSize = 4
diffPreCommitCacheSize = 500
diffSectorCacheSize = 500
if s := os.Getenv(tipsetMessageReceiptSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
tipsetMessageReceiptCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, tipsetMessageReceiptSizeEnv, tipsetMessageReceiptCacheSize, err)
}
}
if s := os.Getenv(executedTsCacheSizeEnv); s != "" {
func getCacheSizeFromEnv(env string, defaultValue int) int {
if s := os.Getenv(env); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
executedTsCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, executedTsCacheSizeEnv, executedTsCacheSize, err)
}
}
if s := os.Getenv(diffPreCommitCacheSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
diffPreCommitCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, diffPreCommitCacheSizeEnv, diffPreCommitCacheSize, err)
}
}
if s := os.Getenv(diffSectorCacheSizeEnv); s != "" {
v, err := strconv.ParseInt(s, 10, 64)
if err == nil {
diffSectorCacheSize = int(v)
} else {
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, diffSectorCacheSizeEnv, diffSectorCacheSize, err)
return int(v)
}
log.Warnf("invalid value (%s) for %s defaulting to %d: %s", s, env, defaultValue, err)
}
return defaultValue
}

func init() {
tipsetMessageReceiptCacheSize = getCacheSizeFromEnv(tipsetMessageReceiptSizeEnv, 4)
executedTsCacheSize = getCacheSizeFromEnv(executedTsCacheSizeEnv, 4)
diffPreCommitCacheSize = getCacheSizeFromEnv(diffPreCommitCacheSizeEnv, 500)
diffSectorCacheSize = getCacheSizeFromEnv(diffSectorCacheSizeEnv, 500)
actorCacheSize = getCacheSizeFromEnv(actorCacheSizeEnv, 1000)
Comment on lines +63 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much cleaner!

}

var _ tasks.DataSource = (*DataSource)(nil)
Expand Down Expand Up @@ -117,6 +98,11 @@ func NewDataSource(node lens.API) (*DataSource, error) {
return nil, err
}

t.actorCache, err = lru.New(actorCacheSize)
if err != nil {
return nil, err
}

return t, nil
}

Expand All @@ -134,6 +120,8 @@ type DataSource struct {

diffPreCommitCache *lru.Cache
diffPreCommitGroup singleflight.Group

actorCache *lru.Cache
}

func (t *DataSource) MessageReceiptEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) {
Expand All @@ -148,10 +136,18 @@ func (t *DataSource) TipSetBlockMessages(ctx context.Context, ts *types.TipSet)
return t.node.MessagesForTipSetBlocks(ctx, ts)
}

func (t *DataSource) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return t.node.ChainGetMessagesInTipset(ctx, tsk)
}

func (t *DataSource) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) {
return t.node.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (t *DataSource) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return t.node.EthGetTransactionReceipt(ctx, txHash)
}

// TipSetMessageReceipts returns the blocks and messages in `pts` and their corresponding receipts from `ts` matching block order in tipset (`pts`).
// TODO replace with lotus chainstore method when https://github.com/filecoin-project/lotus/pull/9186 lands
func (t *DataSource) TipSetMessageReceipts(ctx context.Context, ts, pts *types.TipSet) ([]*lens.BlockMessageReceipts, error) {
Expand Down Expand Up @@ -192,13 +188,29 @@ func (t *DataSource) Store() adt.Store {
}

func (t *DataSource) Actor(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*types.Actor, error) {
metrics.RecordInc(ctx, metrics.DataSourceActorCacheRead)
ctx, span := otel.Tracer("").Start(ctx, "DataSource.Actor")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", tsk.String()))
span.SetAttributes(attribute.String("address", addr.String()))
}
defer span.End()
return t.node.StateGetActor(ctx, addr, tsk)

key, keyErr := asKey(addr, tsk)
if keyErr == nil {
value, found := t.actorCache.Get(key)
if found {
metrics.RecordInc(ctx, metrics.DataSourceActorCacheHit)
return value.(*types.Actor), nil
}
}

act, err := t.node.StateGetActor(ctx, addr, tsk)
if err == nil && keyErr == nil {
t.actorCache.Add(key, act)
}

return act, err
}

func (t *DataSource) MinerPower(ctx context.Context, addr address.Address, ts *types.TipSet) (*api.MinerPower, error) {
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (

// fevm task
fevmblockheadertask "github.com/filecoin-project/lily/tasks/fevm/blockheader"
fevmreceipttask "github.com/filecoin-project/lily/tasks/fevm/receipt"
fevmactorstatstask "github.com/filecoin-project/lily/tasks/fevmactorstats"

"github.com/filecoin-project/lily/chain/indexer/tasktype"
Expand Down Expand Up @@ -644,6 +645,8 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
out.TipsetProcessors[t] = fevmactorstatstask.NewTask(api)
case tasktype.FEVMBlockHeader:
out.TipsetsProcessors[t] = fevmblockheadertask.NewTask(api)
case tasktype.FEVMReceipt:
out.TipsetsProcessors[t] = fevmreceipttask.NewTask(api)

case BuiltinTaskName:
out.ReportProcessors[t] = indexertask.NewTask(api)
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestNewProcessor(t *testing.T) {
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 24)
require.Len(t, proc.tipsetProcessors, 10)
require.Len(t, proc.tipsetsProcessors, 10)
require.Len(t, proc.tipsetsProcessors, 11)
require.Len(t, proc.builtinProcessors, 1)

require.Equal(t, gasoutput.NewTask(nil), proc.tipsetsProcessors[tasktype.GasOutputs])
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,6 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 24)
require.Len(t, proc.TipsetProcessors, 10)
require.Len(t, proc.TipsetsProcessors, 10)
require.Len(t, proc.TipsetsProcessors, 11)
require.Len(t, proc.ReportProcessors, 1)
}
5 changes: 5 additions & 0 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
VerifiedRegistryClaim = "verified_registry_claim"
FEVMActorStats = "fevm_actor_stats"
FEVMBlockHeader = "fevm_block_header"
FEVMReceipt = "fevm_receipt"
)

var AllTableTasks = []string{
Expand Down Expand Up @@ -93,6 +94,7 @@ var AllTableTasks = []string{
VerifiedRegistryClaim,
FEVMActorStats,
FEVMBlockHeader,
FEVMReceipt,
}

var TableLookup = map[string]struct{}{
Expand Down Expand Up @@ -140,6 +142,7 @@ var TableLookup = map[string]struct{}{
VerifiedRegistryClaim: {},
FEVMActorStats: {},
FEVMBlockHeader: {},
FEVMReceipt: {},
}

var TableComment = map[string]string{
Expand Down Expand Up @@ -187,6 +190,7 @@ var TableComment = map[string]string{
VerifiedRegistryClaim: ``,
FEVMActorStats: ``,
FEVMBlockHeader: ``,
FEVMReceipt: ``,
}

var TableFieldComments = map[string]map[string]string{
Expand Down Expand Up @@ -291,4 +295,5 @@ var TableFieldComments = map[string]map[string]string{
VerifiedRegistryClaim: {},
FEVMActorStats: {},
FEVMBlockHeader: {},
FEVMReceipt: {},
}
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var TaskLookup = map[string][]string{
FEVMTask: {
FEVMActorStats,
FEVMBlockHeader,
FEVMReceipt,
},
}

Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 44
const TotalTableTasks = 45
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
2 changes: 2 additions & 0 deletions lens/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ChainAPI interface {

ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error)
ChainGetParentMessages(ctx context.Context, blockCid cid.Cid) ([]api.Message, error)
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error)

ComputeBaseFee(ctx context.Context, ts *types.TipSet) (abi.TokenAmount, error)

Expand Down Expand Up @@ -78,6 +79,7 @@ type VMAPI interface {

type EthModuleAPI interface {
EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error)
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error)
}

type MessageExecution struct {
Expand Down
2 changes: 2 additions & 0 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type LilyAPI interface {
ChainPrune(ctx context.Context, opts api.PruneOpts) error //perm:read
ChainHotGC(ctx context.Context, opts api.HotGCOpts) error //perm:read
EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) //perm:read
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) //perm:read
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) //perm:read

// trigger graceful shutdown
Shutdown(context.Context) error
Expand Down
8 changes: 8 additions & 0 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,14 @@ func (m *LilyNodeAPI) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.Et
return m.EthModuleAPI.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (m *LilyNodeAPI) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return m.EthModuleAPI.EthGetTransactionReceipt(ctx, txHash)
}

func (m *LilyNodeAPI) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return m.ChainAPI.ChainGetMessagesInTipset(ctx, tsk)
}

// MessagesForTipSetBlocks returns messages stored in the blocks of the specified tipset, messages may be duplicated
// across the returned set of BlockMessages.
func (m *LilyNodeAPI) MessagesForTipSetBlocks(ctx context.Context, ts *types.TipSet) ([]*lens.BlockMessages, error) {
Expand Down
10 changes: 10 additions & 0 deletions lens/lily/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type LilyAPIStruct struct {
ChainPrune func(ctx context.Context, opts api.PruneOpts) error `perm:"read"`
ChainHotGC func(ctx context.Context, opts api.HotGCOpts) error `perm:"read"`
EthGetBlockByHash func(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) `perm:"read"`
EthGetTransactionReceipt func(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) `perm:"read"`
ChainGetMessagesInTipset func(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) `perm:"read"`

LogList func(context.Context) ([]string, error) `perm:"read"`
LogSetLevel func(context.Context, string, string) error `perm:"read"`
Expand Down Expand Up @@ -274,3 +276,11 @@ func (s *LilyAPIStruct) LilyGapFillNotify(ctx context.Context, cfg *LilyGapFillN
func (s *LilyAPIStruct) EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error) {
return s.Internal.EthGetBlockByHash(ctx, blkHash, fullTxInfo)
}

func (s *LilyAPIStruct) EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error) {
return s.Internal.EthGetTransactionReceipt(ctx, txHash)
}

func (s *LilyAPIStruct) ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error) {
return s.Internal.ChainGetMessagesInTipset(ctx, tsk)
}
2 changes: 2 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var (
DataSourceMessageExecutionRead = stats.Int64("data_source_message_execution_read", "Number of reads for message executions", stats.UnitDimensionless)
DataSourceMessageExecutionCacheHit = stats.Int64("data_source_message_execution_cache_hit", "Number of cache hits for message executions", stats.UnitDimensionless)
DataSourceActorStateChangesDuration = stats.Float64("data_source_actor_state_change_ms", "Time take to collect actors whose state changed", stats.UnitMilliseconds)
DataSourceActorCacheRead = stats.Int64("data_source_actor_read", "Number of reads for message executions", stats.UnitDimensionless)
DataSourceActorCacheHit = stats.Int64("data_source_actor_cache_hit", "Number of cache hits for message executions", stats.UnitDimensionless)

// Distributed Indexer

Expand Down
51 changes: 51 additions & 0 deletions model/fevm/receipt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fevm

import (
"context"

"go.opencensus.io/tag"

"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/model"
)

type FEVMReceipt struct {
tableName struct{} `pg:"fevm_receipt"` // nolint: structcheck

// Height message was executed at.
Height int64 `pg:",pk,notnull,use_zero"`

// Message CID
Message string `pg:",use_zero"`

TransactionHash string `pg:",notnull"`
TransactionIndex uint64 `pg:",use_zero"`
BlockHash string `pg:",notnull"`
BlockNumber uint64 `pg:",use_zero"`
From string `pg:",notnull"`
To string `pg:",notnull"`
ContractAddress string `pg:",notnull"`
Status uint64 `pg:",use_zero"`
CumulativeGasUsed uint64 `pg:",use_zero"`
GasUsed uint64 `pg:",use_zero"`
EffectiveGasPrice int64 `pg:",use_zero"`
LogsBloom string `pg:",notnull"`
Logs string `pg:",type:jsonb"`
}

func (f *FEVMReceipt) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "fevm_receipt"))
metrics.RecordCount(ctx, metrics.PersistModel, 1)
return s.PersistModel(ctx, f)
}

type FEVMReceiptList []*FEVMReceipt

func (f FEVMReceiptList) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error {
if len(f) == 0 {
return nil
}
ctx, _ = tag.New(ctx, tag.Upsert(metrics.Table, "fevm_receipt"))
metrics.RecordCount(ctx, metrics.PersistModel, len(f))
return s.PersistModel(ctx, f)
}
27 changes: 27 additions & 0 deletions schemas/v1/23_fevm_receipt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package v1

func init() {
patches.Register(
23,
`
CREATE TABLE IF NOT EXISTS {{ .SchemaName | default "public"}}.fevm_receipt (
height BIGINT NOT NULL,
logs jsonb,
transaction_hash TEXT,
transaction_index BIGINT,
block_hash TEXT,
block_number BIGINT,
"from" TEXT,
"to" TEXT,
contract_address TEXT,
status BIGINT,
cumulative_gas_used BIGINT,
gas_used BIGINT,
effective_gas_price BIGINT,
logs_bloom TEXT,
message TEXT,
PRIMARY KEY(height, transaction_hash)
);
`,
)
}
1 change: 1 addition & 0 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ var Models = []interface{}{

(*fevm.FEVMActorStats)(nil),
(*fevm.FEVMBlockHeader)(nil),
(*fevm.FEVMReceipt)(nil),
}

var log = logging.Logger("lily/storage")
Expand Down
2 changes: 2 additions & 0 deletions tasks/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ type DataSource interface {
ShouldBurnFn(ctx context.Context, ts *types.TipSet) (lens.ShouldBurnFn, error)

EthGetBlockByHash(ctx context.Context, blkHash ethtypes.EthHash, fullTxInfo bool) (ethtypes.EthBlock, error)
EthGetTransactionReceipt(ctx context.Context, txHash ethtypes.EthHash) (*api.EthTxReceipt, error)
ChainGetMessagesInTipset(ctx context.Context, tsk types.TipSetKey) ([]api.Message, error)
}
Loading