Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit Accepted Block Cache + Store BlockID Index On-Disk #558

Merged
merged 5 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cli/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func (h *Handler) GeneratePrometheus(baseURI string, openBrowser bool, startProm
if err := cmd.Run(); err != nil {
errChan <- err
utils.Outf("{{orange}}prometheus exited with error:{{/}} %v\n", err)
utils.Outf(`install prometheus using the following commands:

rm -f /tmp/prometheus
wget https://github.com/prometheus/prometheus/releases/download/v2.43.0/prometheus-2.43.0.darwin-amd64.tar.gz
tar -xvf prometheus-2.43.0.darwin-amd64.tar.gz
rm prometheus-2.43.0.darwin-amd64.tar.gz
mv prometheus-2.43.0.darwin-amd64/prometheus /tmp/prometheus
rm -rf prometheus-2.43.0.darwin-amd64

`)
return err
}
utils.Outf("{{cyan}}prometheus exited{{/}}\n")
Expand Down
13 changes: 7 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ func (c *Config) GetMempoolPayerSize() int { return 32 }
func (c *Config) GetMempoolExemptPayers() [][]byte { return nil }
func (c *Config) GetStreamingBacklogSize() int { return 1024 }
func (c *Config) GetStateEvictionBatchSize() int { return 4 * units.MiB }
func (c *Config) GetIntermediateNodeCacheSize() int { return 2 * units.GiB }
func (c *Config) GetIntermediateNodeCacheSize() int { return 4 * units.GiB }
func (c *Config) GetValueNodeCacheSize() int { return 2 * units.GiB }
func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{Enabled: false} }
func (c *Config) GetStateSyncParallelism() int { return 4 }
func (c *Config) GetStateSyncServerDelay() time.Duration { return 0 } // used for testing

func (c *Config) GetParsedBlockCacheSize() int { return 128 }
func (c *Config) GetStateHistoryLength() int { return 256 }
func (c *Config) GetAcceptedBlockWindow() int { return 768 }
func (c *Config) GetStateSyncMinBlocks() uint64 { return 768 }
func (c *Config) GetAcceptorSize() int { return 1024 }
func (c *Config) GetParsedBlockCacheSize() int { return 128 }
func (c *Config) GetStateHistoryLength() int { return 256 }
func (c *Config) GetAcceptedBlockWindowCache() int { return 128 } // 256MB at 2MB blocks
func (c *Config) GetAcceptedBlockWindow() int { return 50_000 } // ~3.5hr with 250ms block time (100GB at 2MB)
func (c *Config) GetStateSyncMinBlocks() uint64 { return 768 } // set to max int for archive nodes to ensure no skips
func (c *Config) GetAcceptorSize() int { return 64 }

func (c *Config) GetContinuousProfilerConfig() *profiler.Config {
return &profiler.Config{Enabled: false}
Expand Down
2 changes: 1 addition & 1 deletion examples/morpheusvm/scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ rm -f ${TMPDIR}/morpheusvm.subnet
cat <<EOF > ${TMPDIR}/morpheusvm.subnet
{
"proposerMinBlockDelay": 0,
"proposerNumHistoricalBlocks": 768
"proposerNumHistoricalBlocks": 50000
}
EOF

Expand Down
2 changes: 1 addition & 1 deletion examples/tokenvm/scripts/deploy.devnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fi
cat <<EOF > ${DEPLOY_ARTIFACT_PREFIX}/tokenvm-subnet-config.json
{
"proposerMinBlockDelay": 0,
"proposerNumHistoricalBlocks": 768
"proposerNumHistoricalBlocks": 50000
}
EOF
cat ${DEPLOY_ARTIFACT_PREFIX}/tokenvm-subnet-config.json
Expand Down
4 changes: 2 additions & 2 deletions examples/tokenvm/scripts/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ cat <<EOF > ${TMPDIR}/tokenvm.config
"mempoolPayerSize": 10000000,
"mempoolExemptPayers":["token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp"],
"parallelism": 5,
"verifySignatures":true,
"verifySignatures": true,
"storeTransactions": ${STORE_TXS},
"streamingBacklogSize": 10000000,
"trackedPairs":["*"],
Expand All @@ -172,7 +172,7 @@ rm -f ${TMPDIR}/tokenvm.subnet
cat <<EOF > ${TMPDIR}/tokenvm.subnet
{
"proposerMinBlockDelay": 0,
"proposerNumHistoricalBlocks": 768
"proposerNumHistoricalBlocks": 50000
}
EOF

Expand Down
2 changes: 1 addition & 1 deletion pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Config struct {
func NewDefaultConfig() Config {
return Config{
CacheSize: 1024 * 1024 * 1024,
BytesPerSync: 1024 * 1024,
BytesPerSync: 4 * 1024 * 1024, // block size is usually at least 2MB
MemTableStopWritesThreshold: 8,
MemTableSize: 16 * 1024 * 1024,
MaxOpenFiles: 4_096,
Expand Down
1 change: 1 addition & 0 deletions vm/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Config interface {
GetStateSyncServerDelay() time.Duration
GetParsedBlockCacheSize() int
GetAcceptedBlockWindow() int
GetAcceptedBlockWindowCache() int
GetContinuousProfilerConfig() *profiler.Config
GetTargetBuildDuration() time.Duration
GetProcessingBuildSkip() int
Expand Down
66 changes: 40 additions & 26 deletions vm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,34 @@ import (
)

type Metrics struct {
txsSubmitted prometheus.Counter // includes gossip
txsReceived prometheus.Counter
seenTxsReceived prometheus.Counter
txsGossiped prometheus.Counter
txsVerified prometheus.Counter
txsAccepted prometheus.Counter
stateChanges prometheus.Counter
stateOperations prometheus.Counter
buildCapped prometheus.Counter
emptyBlockBuilt prometheus.Counter
clearedMempool prometheus.Counter
deletedBlocks prometheus.Counter
mempoolSize prometheus.Gauge
bandwidthPrice prometheus.Gauge
computePrice prometheus.Gauge
storageReadPrice prometheus.Gauge
storageCreatePrice prometheus.Gauge
storageModifyPrice prometheus.Gauge
rootCalculated metric.Averager
waitRoot metric.Averager
waitSignatures metric.Averager
blockBuild metric.Averager
blockParse metric.Averager
blockVerify metric.Averager
blockAccept metric.Averager
blockProcess metric.Averager
txsSubmitted prometheus.Counter // includes gossip
txsReceived prometheus.Counter
seenTxsReceived prometheus.Counter
txsGossiped prometheus.Counter
txsVerified prometheus.Counter
txsAccepted prometheus.Counter
stateChanges prometheus.Counter
stateOperations prometheus.Counter
buildCapped prometheus.Counter
emptyBlockBuilt prometheus.Counter
clearedMempool prometheus.Counter
deletedBlocks prometheus.Counter
blocksFromDisk prometheus.Counter
blocksHeightsFromDisk prometheus.Counter
mempoolSize prometheus.Gauge
bandwidthPrice prometheus.Gauge
computePrice prometheus.Gauge
storageReadPrice prometheus.Gauge
storageCreatePrice prometheus.Gauge
storageModifyPrice prometheus.Gauge
rootCalculated metric.Averager
waitRoot metric.Averager
waitSignatures metric.Averager
blockBuild metric.Averager
blockParse metric.Averager
blockVerify metric.Averager
blockAccept metric.Averager
blockProcess metric.Averager
}

func newMetrics() (*prometheus.Registry, *Metrics, error) {
Expand Down Expand Up @@ -175,6 +177,16 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) {
Name: "deleted_blocks",
Help: "number of blocks deleted",
}),
blocksFromDisk: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "vm",
Name: "blocks_from_disk",
Help: "number of blocks attempted to load from disk",
}),
blocksHeightsFromDisk: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "vm",
Name: "block_heights_from_disk",
Help: "number of block heights attempted to load from disk",
}),
mempoolSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "chain",
Name: "mempool_size",
Expand Down Expand Up @@ -229,6 +241,8 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) {
r.Register(m.emptyBlockBuilt),
r.Register(m.clearedMempool),
r.Register(m.deletedBlocks),
r.Register(m.blocksFromDisk),
r.Register(m.blocksHeightsFromDisk),
r.Register(m.bandwidthPrice),
r.Register(m.computePrice),
r.Register(m.storageReadPrice),
Expand Down
80 changes: 66 additions & 14 deletions vm/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
"go.uber.org/zap"
Expand All @@ -34,9 +35,11 @@ func init() {
}

const (
blockPrefix = 0x0
warpSignaturePrefix = 0x1
warpFetchPrefix = 0x2
blockPrefix = 0x0 // TODO: move to flat files (https://github.com/ava-labs/hypersdk/issues/553)
blockIDHeightPrefix = 0x1 // ID -> Height
blockHeightIDPrefix = 0x2 // Height -> ID (don't always need full block from disk)
warpSignaturePrefix = 0x3
warpFetchPrefix = 0x4
)

var (
Expand All @@ -46,19 +49,33 @@ var (
signatureLRU = &cache.LRU[string, *chain.WarpSignature]{Size: 1024}
)

func PrefixBlockHeightKey(height uint64) []byte {
func PrefixBlockKey(height uint64) []byte {
k := make([]byte, 1+consts.Uint64Len)
k[0] = blockPrefix
binary.BigEndian.PutUint64(k[1:], height)
return k
}

func PrefixBlockIDHeightKey(id ids.ID) []byte {
k := make([]byte, 1+consts.IDLen)
k[0] = blockIDHeightPrefix
copy(k[1:], id[:])
return k
}

func PrefixBlockHeightIDKey(height uint64) []byte {
k := make([]byte, 1+consts.Uint64Len)
k[0] = blockHeightIDPrefix
binary.BigEndian.PutUint64(k[1:], height)
return k
}

func (vm *VM) HasGenesis() (bool, error) {
return vm.HasDiskBlock(0)
}

func (vm *VM) GetGenesis() (*chain.StatefulBlock, error) {
return vm.GetDiskBlock(0)
func (vm *VM) GetGenesis(ctx context.Context) (*chain.StatelessBlock, error) {
return vm.GetDiskBlock(ctx, 0)
}

func (vm *VM) SetLastAcceptedHeight(height uint64) error {
Expand Down Expand Up @@ -96,16 +113,35 @@ func (vm *VM) shouldComapct(expiryHeight uint64) bool {
// compaction as storing blocks randomly on-disk (when using [block.ID]).
func (vm *VM) UpdateLastAccepted(blk *chain.StatelessBlock) error {
batch := vm.vmDB.NewBatch()
if err := batch.Put(lastAccepted, binary.BigEndian.AppendUint64(nil, blk.Height())); err != nil {
bigEndianHeight := binary.BigEndian.AppendUint64(nil, blk.Height())
if err := batch.Put(lastAccepted, bigEndianHeight); err != nil {
return err
}
if err := batch.Put(PrefixBlockKey(blk.Height()), blk.Bytes()); err != nil {
return err
}
if err := batch.Put(PrefixBlockHeightKey(blk.Height()), blk.Bytes()); err != nil {
if err := batch.Put(PrefixBlockIDHeightKey(blk.ID()), bigEndianHeight); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to store this on-disk now so that we can service historical Get requests. We make this tradeoff so that we can avoid db compaction (from storing blocks all over disk).

return err
}
blkID := blk.ID()
if err := batch.Put(PrefixBlockHeightIDKey(blk.Height()), blkID[:]); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We store a height lookup to avoid pulling entire blocks from disk to service historical queries.

return err
}
expiryHeight := blk.Height() - uint64(vm.config.GetAcceptedBlockWindow())
var expired bool
if expiryHeight > 0 && expiryHeight < blk.Height() { // ensure we don't free genesis
if err := batch.Delete(PrefixBlockHeightKey(expiryHeight)); err != nil {
if err := batch.Delete(PrefixBlockKey(expiryHeight)); err != nil {
return err
}
blkID, err := vm.vmDB.Get(PrefixBlockHeightIDKey(expiryHeight))
if err == nil {
if err := batch.Delete(PrefixBlockIDHeightKey(ids.ID(blkID))); err != nil {
return err
}
} else {
vm.Logger().Warn("unable to delete blkID", zap.Uint64("height", expiryHeight), zap.Error(err))
}
if err := batch.Delete(PrefixBlockHeightIDKey(expiryHeight)); err != nil {
return err
}
expired = true
Expand All @@ -130,24 +166,40 @@ func (vm *VM) UpdateLastAccepted(blk *chain.StatelessBlock) error {
return nil
}

func (vm *VM) GetDiskBlock(height uint64) (*chain.StatefulBlock, error) {
b, err := vm.vmDB.Get(PrefixBlockHeightKey(height))
func (vm *VM) GetDiskBlock(ctx context.Context, height uint64) (*chain.StatelessBlock, error) {
b, err := vm.vmDB.Get(PrefixBlockKey(height))
if err != nil {
return nil, err
}
return chain.UnmarshalBlock(b, vm)
return chain.ParseBlock(ctx, b, choices.Accepted, vm)
}

func (vm *VM) HasDiskBlock(height uint64) (bool, error) {
return vm.vmDB.Has(PrefixBlockHeightKey(height))
return vm.vmDB.Has(PrefixBlockKey(height))
}

func (vm *VM) GetBlockHeightID(height uint64) (ids.ID, error) {
b, err := vm.vmDB.Get(PrefixBlockHeightIDKey(height))
if err != nil {
return ids.Empty, err
}
return ids.ID(b), nil
}

func (vm *VM) GetBlockIDHeight(blkID ids.ID) (uint64, error) {
b, err := vm.vmDB.Get(PrefixBlockIDHeightKey(blkID))
if err != nil {
return 0, err
}
return binary.BigEndian.Uint64(b), nil
}

// CompactDiskBlocks forces compaction on the entire range of blocks up to [lastExpired].
//
// This can be used to ensure we clean up all large tombstoned keys on a regular basis instead
// of waiting for the database to run a compaction (and potentially delete GBs of data at once).
func (vm *VM) CompactDiskBlocks(lastExpired uint64) error {
return vm.vmDB.Compact([]byte{blockPrefix}, PrefixBlockHeightKey(lastExpired))
return vm.vmDB.Compact([]byte{blockPrefix}, PrefixBlockKey(lastExpired))
}

func (vm *VM) GetDiskIsSyncing() (bool, error) {
Expand Down
Loading
Loading