Skip to content

Commit

Permalink
update file writing mode
Browse files Browse the repository at this point in the history
  • Loading branch information
i-norden committed Mar 24, 2022
1 parent 9775355 commit 7bc4c75
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 143 deletions.
2 changes: 1 addition & 1 deletion statediff/indexer/database/file/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package file

// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber uint64
BlockNumber string

submit func(blockTx *BatchTx, err error) error
}
Expand Down
162 changes: 90 additions & 72 deletions statediff/indexer/database/file/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"os"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -53,10 +54,12 @@ var (

// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
type StateDiffIndexer struct {
fileWriter *SQLWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
fileWriter *SQLWriter
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
blockNumber string
removedCacheFlag *uint32
}

// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
Expand All @@ -77,7 +80,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
wg := new(sync.WaitGroup)
w.Loop()
w.upsertNode(config.NodeInfo)
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
return &StateDiffIndexer{
fileWriter: w,
chainConfig: chainConfig,
Expand All @@ -92,6 +94,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32)
sdi.blockNumber = block.Number().String()
start, t := time.Now(), time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
Expand Down Expand Up @@ -127,7 +131,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()

blockTx := &BatchTx{
BlockNumber: height,
BlockNumber: sdi.blockNumber,
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
Expand Down Expand Up @@ -189,7 +193,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(headerNode)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, headerNode)

var baseFee *string
if header.BaseFee != nil {
Expand All @@ -202,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
BlockNumber: sdi.blockNumber,
BlockHash: headerID,
TotalDifficulty: td.String(),
Reward: reward.String(),
Expand All @@ -221,7 +225,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
// publish and index uncles
for _, uncleNode := range uncleNodes {
sdi.fileWriter.upsertIPLDNode(uncleNode)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, uncleNode)
var uncleReward *big.Int
// in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil {
Expand All @@ -230,12 +234,13 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64,
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
}
sdi.fileWriter.upsertUncleCID(models.UncleModel{
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
})
}
}
Expand All @@ -261,10 +266,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts {
for _, logTrieNode := range args.logTrieNodes[i] {
sdi.fileWriter.upsertIPLDNode(logTrieNode)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, logTrieNode)
}
txNode := args.txNodes[i]
sdi.fileWriter.upsertIPLDNode(txNode)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, txNode)

// index tx
trx := args.txs[i]
Expand All @@ -281,16 +286,17 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: txID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
BlockNumber: sdi.blockNumber,
HeaderID: args.headerID,
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: txID,
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
sdi.fileWriter.upsertTransactionCID(txModel)

Expand All @@ -301,6 +307,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
BlockNumber: sdi.blockNumber,
TxID: txID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
Expand All @@ -322,6 +329,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}

rctModel := &models.ReceiptModel{
BlockNumber: sdi.blockNumber,
TxID: txID,
Contract: contract,
ContractHash: contractHash,
Expand Down Expand Up @@ -349,25 +357,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}

logDataSet[idx] = &models.LogsModel{
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
BlockNumber: sdi.blockNumber,
ReceiptID: txID,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
Topic0: topicSet[0],
Topic1: topicSet[1],
Topic2: topicSet[2],
Topic3: topicSet[3],
}
}
sdi.fileWriter.upsertLogCID(logDataSet)
}

// publish trie nodes, these aren't indexed directly
for i, n := range args.txTrieNodes {
sdi.fileWriter.upsertIPLDNode(n)
sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i])
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, n)
sdi.fileWriter.upsertIPLDNode(sdi.blockNumber, args.rctTrieNodes[i])
}

return nil
Expand All @@ -377,30 +386,34 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
// publish the state node
if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
}
stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
sdi.fileWriter.upsertStateCID(stateModel)
return nil
}
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
// index the state node
sdi.fileWriter.upsertStateCID(stateModel)
Expand All @@ -418,6 +431,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Balance: account.Balance.String(),
Expand All @@ -430,32 +444,36 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes {
if storageNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, shared.RemovedNodeMhKey, []byte{})
}
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
sdi.fileWriter.upsertStorageCID(storageModel)
continue
}
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(sdi.blockNumber, ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
BlockNumber: sdi.blockNumber,
HeaderID: headerID,
StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
sdi.fileWriter.upsertStorageCID(storageModel)
}
Expand All @@ -470,7 +488,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
sdi.fileWriter.upsertIPLDDirect(sdi.blockNumber, mhKey, codeAndCodeHash.Code)
return nil
}

Expand Down
Loading

0 comments on commit 7bc4c75

Please sign in to comment.