From defff9740a6505ba7412799f6204daca653da4e2 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Mon, 16 Oct 2023 15:13:40 -0700 Subject: [PATCH] [chain] Parallel Transaction Execution During Building and Verification (#560) * start planning changes * add more notes * update default map sizes in executor * layout prefetch * add stop/error return to executor * add locking to fee manager * redesign tstate functionality to allow for parallel exec * update chan transaction * update chain block * fix builder * fix feeManger locking * make sure to default to creation allowed * fix cache escape * integration tests passing * change var names * remove prints * pre-allocate tstate_view memory * make memory usage tighter * progress * fix fee manager limit * make tx execution cores configurable * update vm resolutions * make execution concurrency configurable * remove unused struct * executor tests passing * add test for err and stop * add missing licenses * all tests passing * use ExportMerkleView * add executor metrics * add prometheus charts * remove unnecesary else * update name of ExportMerkleView * update executor interface * finish parallel transaction execution section * add programs section --- README.md | 90 ++-- chain/block.go | 19 +- chain/builder.go | 488 +++++++++--------- chain/dependencies.go | 4 + chain/fee_manager.go | 81 ++- chain/processor.go | 243 ++++----- chain/transaction.go | 2 +- config/config.go | 15 +- .../cmd/morpheus-cli/cmd/prometheus.go | 6 + examples/morpheusvm/config/config.go | 24 +- examples/morpheusvm/scripts/run.sh | 4 +- examples/morpheusvm/tests/load/load_test.go | 6 +- .../tokenvm/cmd/token-cli/cmd/prometheus.go | 6 + examples/tokenvm/config/config.go | 24 +- examples/tokenvm/scripts/deploy.devnet.sh | 3 + examples/tokenvm/scripts/run.sh | 4 +- examples/tokenvm/tests/load/load_test.go | 6 +- executor/dependencies.go | 9 + executor/errors.go | 8 + executor/executor.go | 86 ++- executor/executor_test.go | 84 ++- tstate/tstate.go | 284 +--------- tstate/tstate_test.go | 282 +++++----- tstate/tstate_view.go | 265 ++++++++++ vm/dependencies.go | 11 +- vm/metrics.go | 104 +++- vm/resolutions.go | 13 + vm/vm.go | 8 +- 28 files changed, 1196 insertions(+), 983 deletions(-) create mode 100644 executor/dependencies.go create mode 100644 executor/errors.go create mode 100644 tstate/tstate_view.go diff --git a/README.md b/README.md index 9b3098e58d..3009715d5c 100644 --- a/README.md +++ b/README.md @@ -83,37 +83,28 @@ a bandwidth-aware dynamic sync implementation provided by `avalanchego`, to sync to the tip of any `hyperchain`. #### Block Pruning -By default, the `hypersdk` only stores what is necessary to build/verfiy the next block -and to help new nodes sync the current state (not execute all historical state transitions). -If the `hypersdk` did not limit block storage grwoth, the storage requirements for validators +The `hypersdk` defaults to only storing what is necessary to build/verify the next block +and to help new nodes sync the current state (not execute historical state transitions). +If the `hypersdk` did not limit block storage growth, the disk requirements for validators would grow at an alarming rate each day (making running any `hypervm` impractical). Consider the simple example where we process 25k transactions per second (assume each -transaction is ~400 bytes). This would would require the `hypersdk` to store 10MB per +transaction is ~400 bytes); this would would require the `hypersdk` to store 10MB per second (not including any overhead in the database for doing so). **This works out to 864GB per day or 315.4TB per year.** -In practice, this means the `hypersdk` only stores the last 768 accepted blocks the genesis block, -and the last 256 revisions of state (the [ProposerVM](https://github.com/ava-labs/avalanchego/blob/master/vms/proposervm/README.md) -also stores the last 768 blocks). With a 100ms `MinimumBlockGap`, the `hypersdk` must -store at least ~600 blocks to allow for the entire `ValidityWindow` to be backfilled (otherwise +When `MinimumBlockGap=250ms` (minimum time betweem blocks), the `hypersdk` must store at +least ~240 blocks to allow for the entire `ValidityWindow` to be backfilled (otherwise a fully-synced, restarting `hypervm` will not become "ready" until it accepts a block at -least `ValidityWindow` after the last accepted block). +least `ValidityWindow` after the last accepted block). To provide some room for error during +disaster recovery (network outage), however, it is recommened to configure the `hypersdk` to +store the last >= ~50,000 accepted blocks (~3.5 hours of activity with a 250ms `MinimumBlockGap`). +This allows archival nodes that become disconnected from the network (due to a data center outage or bug) +to ensure they can persist all historical blocks (which would otherwise be deleted by all participants and +unindexable). -_The number of blocks and/or state revisions that the `hypersdk` stores, the `AcceptedBlockWindow`, can -be tuned by any `hypervm`. It is not possible, however, to configure the `hypersdk` to store -all historical blocks (the `AcceptedBlockWindow` is pinned to memory)._ - -#### PebbleDB -Instead of employing [`goleveldb`](https://github.com/syndtr/goleveldb), the -`hypersdk` uses CockroachDB's [`pebble`](https://github.com/cockroachdb/pebble) database for -on-disk storage. This database is inspired by LevelDB/RocksDB but offers [a few -improvements](https://github.com/cockroachdb/pebble#advantages). - -Unlike other Avalanche VMs, which store data inside `avalanchego's` root -database, `hypervms` store different types of data (state, blocks, metadata, etc.) under -a set of distinct paths in `avalanchego's` provided `chainData` directory. -This structure enables anyone running a `hypervm` to employ multiple logical disk -drives to increase a `hyperchain's` throughput (which may otherwise be capped by a single disk's IO). +_The number of blocks that the `hypersdk` stores on-disk, the `AcceptedBlockWindow`, can be tuned by any `hypervm` +to an arbitrary depth (or set to `MaxInt` to keep all blocks). To limit disk IO used to serve blocks over +the P2P network, `hypervms` can configure `AcceptedBlockWindowCache` to store recent blocks in memory._ ### Optimized Block Execution Out-of-the-Box The `hypersdk` is primarily about an obsession with hyper-speed and @@ -126,24 +117,24 @@ thus far has been dedicated to making block verification and state management as fast and efficient as possible, which both play a large role in making this happen. -#### State Pre-Fetching -`hypersdk` transactions must specify the keys they will touch in state (read -or write) during execution and authentication so that all relevant data can be -pre-fetched before block execution starts, which ensures all data accessed during -verification of a block is done so in memory). Notably, the keys specified here -are not keys in a merkle trie (which may be quite volatile) but are instead the -actual keys used to access data by the storage engine (like your address, which -is much less volatile and not as cumbersome of a UX barrier). - -This restriction also enables transactions to be processed in parallel as distinct, -ordered transaction sets can be trivially formed by looking at the overlap of keys -that transactions will touch. - -_Parallel transaction execution was originally included in `hypersdk` but -removed because the overhead of the naïve mechanism used to group transactions -into execution sets prior to execution was slower than just executing transactions -serially with state pre-fetching. Rewriting this mechanism has been moved to the -`Future Work` section and we expect to re-enable this functionality soon._ +#### Parallel Transaction Execution +`hypersdk` transactions must specify the keys they will access in state (read +and/or write) during authentication and execution so that non-conflicting transactions +can be processed in parallel. To do this efficiently, the `hypersdk` uses +the [`executor`](https://github.com/ava-labs/hypersdk/tree/main/executor) package, which +can generate an execution plan for a set of transactions on-the-fly (no preprocessing required). +`executor` is used to parallelize execution in both block building and in block verification. + +When a `hypervm's` `Auth` and `Actions` are simple and pre-specified (like in the `morpheusvm`), +the primary benefit of parallel execution is to concurrently fetch the state needed for execution +(actual execution of precompiled golang only takes nanoseconds). However, parallel execution +massively speeds up the E2E execution of a block of `programs`, which may each take a few milliseconds +to process. Consider the simple scenario where a `program` takes 2 milliseconds; processing 1000 `programs` +in serial would take 2 seconds (far too long for a high-throughput blockchain). The same execution, however, +would only take 125 milliseconds if run over 16 cores (assuming no conflicts). + +_The number of cores that the `hypersdk` allocates to execution can be tuned by +any `hypervm` using the `TransactionExecutionCores` configuration._ #### Deferred Root Generation All `hypersdk` blocks include a state root to support dynamic state sync. In dynamic @@ -201,6 +192,18 @@ capability for any `Auth` module that implements the `AuthBatchVerifier` interfa even parallelizing batch computation for systems that only use a single-thread to verify a batch. +### WASM-Based Programs +In the `hypersdk`, [smart contracts](https://ethereum.org/en/developers/docs/smart-contracts/) +(e.g. programs that run on blockchains) are referred to simply as `programs`. `Programs` +are [WASM-based](https://webassembly.org/) binaries that can be invoked during block +execution to perform arbitrary state transitions. This is a more flexible, yet less performant, +alternative to defining all `Auth` and/or `Actions` that can be invoked in the `hypervm` in the +`hypervm's` code (like the `tokenvm`). + +Because the `hypersdk` can execute arbitrary WASM, any language (Rust, C, C++, Zig, etc.) that can +be compiled to WASM can be used to write `programs`. You can view a collection of +Rust-based `programs` [here](https://github.com/ava-labs/hypersdk/tree/main/x/programs/rust/examples). + ### Multidimensional Fee Pricing Instead of mapping transaction resource usage to a one-dimensional unit (i.e. "gas" or "fuel"), the `hypersdk` utilizes five independently parameterized unit dimensions @@ -1003,9 +1006,6 @@ _If you want to take the lead on any of these items, please [start a discussion](https://github.com/ava-labs/hypersdk/discussions) or reach out on the Avalanche Discord._ -* Use pre-specified state keys to process transactions in parallel (txs with no - overlap can be processed at the same time, create conflict sets on-the-fly - instead of before execution) * Add support for Fixed-Fee Accounts (pay set unit price no matter what) * Use a memory arena (pre-allocated memory) to avoid needing to dynamically allocate memory during block and transaction parsing diff --git a/chain/block.go b/chain/block.go index cf17b7af10..2bd187263d 100644 --- a/chain/block.go +++ b/chain/block.go @@ -579,12 +579,8 @@ func (b *StatelessBlock) innerVerify(ctx context.Context, vctx VerifyContext) er return err } - // Optimisticaly fetch view - processor := NewProcessor(b.vm.Tracer(), b) - processor.Prefetch(ctx, parentView) - - // Process new transactions - results, ts, err := processor.Execute(ctx, feeManager, r) + // Process transactions + results, ts, err := b.Execute(ctx, b.vm.Tracer(), parentView, feeManager, r) if err != nil { log.Error("failed to execute block", zap.Error(err)) return err @@ -613,20 +609,21 @@ func (b *StatelessBlock) innerVerify(ctx context.Context, vctx VerifyContext) er heightKeyStr := string(heightKey) timestampKeyStr := string(timestampKey) feeKeyStr := string(feeKey) - ts.SetScope(ctx, set.Of(heightKeyStr, timestampKeyStr, feeKeyStr), map[string][]byte{ + tsv := ts.NewView(set.Of(heightKeyStr, timestampKeyStr, feeKeyStr), map[string][]byte{ heightKeyStr: parentHeightRaw, timestampKeyStr: parentTimestampRaw, feeKeyStr: parentFeeManager.Bytes(), }) - if err := ts.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil { + if err := tsv.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil { return err } - if err := ts.Insert(ctx, timestampKey, binary.BigEndian.AppendUint64(nil, uint64(b.Tmstmp))); err != nil { + if err := tsv.Insert(ctx, timestampKey, binary.BigEndian.AppendUint64(nil, uint64(b.Tmstmp))); err != nil { return err } - if err := ts.Insert(ctx, feeKey, feeManager.Bytes()); err != nil { + if err := tsv.Insert(ctx, feeKey, feeManager.Bytes()); err != nil { return err } + tsv.Commit() // Compare state root // @@ -662,7 +659,7 @@ func (b *StatelessBlock) innerVerify(ctx context.Context, vctx VerifyContext) er // Get view from [tstate] after processing all state transitions b.vm.RecordStateChanges(ts.PendingChanges()) b.vm.RecordStateOperations(ts.OpIndex()) - view, err := ts.CreateView(ctx, parentView, b.vm.Tracer()) + view, err := ts.ExportMerkleDBView(ctx, b.vm.Tracer(), parentView) if err != nil { return err } diff --git a/chain/builder.go b/chain/builder.go index 99f7b4112d..9cc3375f4c 100644 --- a/chain/builder.go +++ b/chain/builder.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/ids" smblock "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/math" @@ -19,6 +20,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/zap" + "github.com/ava-labs/hypersdk/executor" "github.com/ava-labs/hypersdk/keys" "github.com/ava-labs/hypersdk/tstate" ) @@ -105,29 +107,30 @@ func BuildBlock( maxUnits := r.GetMaxBlockUnits() targetUnits := r.GetWindowTargetUnits() - ts := tstate.New(changesEstimate) - var ( + ts = tstate.New(changesEstimate) oldestAllowed = nextTime - r.GetValidityWindow() mempool = vm.Mempool() + // restorable txs after block attempt finishes + restorableLock sync.Mutex + restorable = []*Transaction{} + + // cache contains keys already fetched from state that can be + // used during prefetching. + cacheLock sync.RWMutex + cache = map[string]*fetchData{} + + blockLock sync.RWMutex + warpAdded = uint(0) + start = time.Now() txsAttempted = 0 results = []*Result{} - warpCount = 0 vdrState = vm.ValidatorState() sm = vm.StateManager() - start = time.Now() - - // restorable txs after block attempt finishes - restorable = []*Transaction{} - - // alreadyFetched contains keys already fetched from state that can be - // used during prefetching. - alreadyFetched = map[string]*fetchData{} - // prepareStreamLock ensures we don't overwrite stream prefetching spawned // asynchronously. prepareStreamLock sync.Mutex @@ -145,274 +148,265 @@ func BuildBlock( b.vm.RecordClearedMempool() break } + ctx, executeSpan := vm.Tracer().Start(ctx, "chain.BuildBlock.Execute") - // Prefetch all transactions - // - // TODO: unify logic with https://github.com/ava-labs/hypersdk/blob/4e10b911c3cd88e0ccd8d9de5210515b1d3a3ac4/chain/processor.go#L44-L79 - var ( - readyTxs = make(chan *txData, len(txs)) - stopIndex = -1 - execErr error - ) - go func() { - ctx, prefetchSpan := vm.Tracer().Start(ctx, "chain.BuildBlock.Prefetch") - defer prefetchSpan.End() - defer close(readyTxs) - - for i, tx := range txs { - if execErr != nil { - stopIndex = i - return - } - - // Once we get part way through a prefetching job, we start - // to prepare for the next stream. - if i == streamPrefetchThreshold { - prepareStreamLock.Lock() - go func() { - mempool.PrepareStream(ctx, streamBatch) - prepareStreamLock.Unlock() - }() - } - - // Prefetch all values from state - storage := map[string][]byte{} - stateKeys, err := tx.StateKeys(sm) - if err != nil { - // Drop bad transaction and continue - // - // This should not happen because we check this before - // adding a transaction to the mempool. - continue - } - for k := range stateKeys { - if v, ok := alreadyFetched[k]; ok { - if v.exists { - storage[k] = v.v - } - continue - } - v, err := parentView.GetValue(ctx, []byte(k)) - if errors.Is(err, database.ErrNotFound) { - alreadyFetched[k] = &fetchData{nil, false, 0} - continue - } else if err != nil { - // This can happen if the underlying view changes (if we are - // verifying a block that can never be accepted). - execErr = err - stopIndex = i - return - } - numChunks, ok := keys.NumChunks(v) - if !ok { - // Drop bad transaction and continue - // - // This should not happen because we check this before - // adding a transaction to the mempool. - continue - } - alreadyFetched[k] = &fetchData{v, true, numChunks} - storage[k] = v - } - readyTxs <- &txData{tx, storage, nil, nil} - } - }() - - // Perform a batch repeat check while we are waiting for state prefetching + // Perform a batch repeat check dup, err := parent.IsRepeat(ctx, oldestAllowed, txs, set.NewBits(), false) if err != nil { - execErr = err + restorable = append(restorable, txs...) + break } - // Execute transactions as they become ready - ctx, executeSpan := vm.Tracer().Start(ctx, "chain.BuildBlock.Execute") - txIndex := 0 - for nextTxData := range readyTxs { + e := executor.New(streamBatch, vm.GetTransactionExecutionCores(), vm.GetExecutorBuildRecorder()) + pending := make(map[ids.ID]*Transaction, streamBatch) + var pendingLock sync.Mutex + for li, ltx := range txs { txsAttempted++ - next := nextTxData.tx - if execErr != nil { - restorable = append(restorable, next) - continue - } + i := li + tx := ltx - // Skip if tx is a duplicate - if dup.Contains(txIndex) { + // Skip any duplicates before going async + if dup.Contains(i) { continue } - txIndex++ // Ensure we can process if transaction includes a warp message - if next.WarpMessage != nil && blockContext == nil { + if tx.WarpMessage != nil && blockContext == nil { log.Info( "dropping pending warp message because no context provided", - zap.Stringer("txID", next.ID()), - ) - restorable = append(restorable, next) - continue - } - - // Skip warp message if at max - if next.WarpMessage != nil && warpCount == MaxWarpMessages { - log.Info( - "dropping pending warp message because already have MaxWarpMessages", - zap.Stringer("txID", next.ID()), + zap.Stringer("txID", tx.ID()), ) - restorable = append(restorable, next) + restorableLock.Lock() + restorable = append(restorable, tx) + restorableLock.Unlock() continue } - // Ensure we have room - nextUnits, err := next.MaxUnits(sm, r) + stateKeys, err := tx.StateKeys(sm) if err != nil { - // Should never happen - log.Warn( - "skipping tx: invalid max units", - zap.Error(err), - ) + // Drop bad transaction and continue + // + // This should not happen because we check this before + // adding a transaction to the mempool. continue } - if ok, dimension := feeManager.CanConsume(nextUnits, maxUnits); !ok { - log.Debug( - "skipping tx: too many units", - zap.Int("dimension", int(dimension)), - zap.Uint64("tx", nextUnits[dimension]), - zap.Uint64("block units", feeManager.LastConsumed(dimension)), - zap.Uint64("max block units", maxUnits[dimension]), - ) - restorable = append(restorable, next) - // If we are above the target for the dimension we can't consume, we will - // stop building. This prevents a full mempool iteration looking for the - // "perfect fit". - if feeManager.LastConsumed(dimension) >= targetUnits[dimension] { - execErr = errBlockFull - } - continue + // Once we get part way through a prefetching job, we start + // to prepare for the next stream. + if i == streamPrefetchThreshold { + prepareStreamLock.Lock() + go func() { + mempool.PrepareStream(ctx, streamBatch) + prepareStreamLock.Unlock() + }() } - // Populate required transaction state and restrict which keys can be used - txStart := ts.OpIndex() - stateKeys, err := next.StateKeys(sm) - if err != nil { - // This should not happen because we check this before - // adding a transaction to the mempool. - log.Warn( - "skipping tx: invalid stateKeys", - zap.Error(err), + // We track pending transactions because an error may cause us + // not to execute restorable transactions. + pendingLock.Lock() + pending[tx.ID()] = tx + pendingLock.Unlock() + e.Run(stateKeys, func() error { + // We use defer here instead of covering all returns because it is + // much easier to manage. + var restore bool + defer func() { + pendingLock.Lock() + delete(pending, tx.ID()) + pendingLock.Unlock() + + if !restore { + return + } + restorableLock.Lock() + restorable = append(restorable, tx) + restorableLock.Unlock() + }() + + // Fetch keys from cache + var ( + storage = make(map[string][]byte, len(stateKeys)) + toLookup = make([]string, 0, len(stateKeys)) ) - continue - } - ts.SetScope(ctx, stateKeys, nextTxData.storage) + cacheLock.RLock() + for k := range stateKeys { + if v, ok := cache[k]; ok { + if v.exists { + storage[k] = v.v + } + continue + } + toLookup = append(toLookup, k) + } + cacheLock.RUnlock() + + // Fetch keys from disk + var toCache map[string]*fetchData + if len(toLookup) > 0 { + toCache = make(map[string]*fetchData, len(toLookup)) + for _, k := range toLookup { + v, err := parentView.GetValue(ctx, []byte(k)) + if errors.Is(err, database.ErrNotFound) { + toCache[k] = &fetchData{nil, false, 0} + continue + } else if err != nil { + return err + } + // We verify that the [NumChunks] is already less than the number + // added on the write path, so we don't need to do so again here. + numChunks, ok := keys.NumChunks(v) + if !ok { + return ErrInvalidKeyValue + } + toCache[k] = &fetchData{v, true, numChunks} + storage[k] = v + } - // PreExecute next to see if it is fit - authCUs, err := next.PreExecute(ctx, feeManager, sm, r, ts, nextTime) - if err != nil { - ts.Rollback(ctx, txStart) - if HandlePreExecute(log, err) { - restorable = append(restorable, next) + // Update key cache regardless of whether exit is graceful + defer func() { + cacheLock.Lock() + for k := range toCache { + cache[k] = toCache[k] + } + cacheLock.Unlock() + }() } - continue - } - // Verify warp message, if it exists - // - // We don't drop invalid warp messages because we must collect fees for - // the work the sender made us do (otherwise this would be a DoS). - // - // We wait as long as possible to verify the signature to ensure we don't - // spend unnecessary time on an invalid tx. - var warpErr error - if next.WarpMessage != nil { - // We do not check the validity of [SourceChainID] because a VM could send - // itself a message to trigger a chain upgrade. - allowed, num, denom := r.GetWarpConfig(next.WarpMessage.SourceChainID) - if allowed { - warpErr = next.WarpMessage.Signature.Verify( - ctx, &next.WarpMessage.UnsignedMessage, r.NetworkID(), - vdrState, blockContext.PChainHeight, num, denom, - ) - } else { - warpErr = ErrDisabledChainID + // Execute block + tsv := ts.NewView(stateKeys, storage) + authCUs, err := tx.PreExecute(ctx, feeManager, sm, r, tsv, nextTime) + if err != nil { + // We don't need to rollback [tsv] here because it will never + // be committed. + if HandlePreExecute(log, err) { + restore = true + } + return nil } - if warpErr != nil { - log.Warn( - "warp verification failed", - zap.Stringer("txID", next.ID()), - zap.Error(warpErr), - ) + + // Verify warp message, if it exists + // + // We don't drop invalid warp messages because we must collect fees for + // the work the sender made us do (otherwise this would be a DoS). + // + // We wait as long as possible to verify the signature to ensure we don't + // spend unnecessary time on an invalid tx. + var warpErr error + if tx.WarpMessage != nil { + // We do not check the validity of [SourceChainID] because a VM could send + // itself a message to trigger a chain upgrade. + allowed, num, denom := r.GetWarpConfig(tx.WarpMessage.SourceChainID) + if allowed { + warpErr = tx.WarpMessage.Signature.Verify( + ctx, &tx.WarpMessage.UnsignedMessage, r.NetworkID(), + vdrState, blockContext.PChainHeight, num, denom, + ) + } else { + warpErr = ErrDisabledChainID + } + if warpErr != nil { + log.Warn( + "warp verification failed", + zap.Stringer("txID", tx.ID()), + zap.Error(warpErr), + ) + } } - } - // If execution works, keep moving forward with new state - // - // Note, these calculations must match block verification exactly - // otherwise they will produce a different state root. - coldReads := map[string]uint16{} - warmReads := map[string]uint16{} - var invalidStateKeys bool - for k := range stateKeys { - v := nextTxData.storage[k] - numChunks, ok := keys.NumChunks(v) - if !ok { - invalidStateKeys = true - break + // If execution works, keep moving forward with new state + // + // Note, these calculations must match block verification exactly + // otherwise they will produce a different state root. + blockLock.RLock() + coldReads := make(map[string]uint16, len(stateKeys)) + warmReads := make(map[string]uint16, len(stateKeys)) + var invalidStateKeys bool + for k := range stateKeys { + v := storage[k] + numChunks, ok := keys.NumChunks(v) + if !ok { + invalidStateKeys = true + break + } + if usedKeys.Contains(k) { + warmReads[k] = numChunks + continue + } + coldReads[k] = numChunks } - if usedKeys.Contains(k) { - warmReads[k] = numChunks - continue + blockLock.RUnlock() + if invalidStateKeys { + // This should not happen because we check this before + // adding a transaction to the mempool. + log.Warn("invalid tx: invalid state keys") + return nil + } + result, err := tx.Execute( + ctx, + feeManager, + authCUs, + coldReads, + warmReads, + sm, + r, + tsv, + nextTime, + tx.WarpMessage != nil && warpErr == nil, + ) + if err != nil { + // Returning an error here should be avoided at all costs (can be a DoS). Rather, + // all units for the transaction should be consumed and a fee should be charged. + log.Warn("unexpected post-execution error", zap.Error(err)) + restore = true + return err } - coldReads[k] = numChunks - } - if invalidStateKeys { - // This should not happen because we check this before - // adding a transaction to the mempool. - log.Warn("invalid tx: invalid state keys") - continue - } - result, err := next.Execute( - ctx, - feeManager, - authCUs, - coldReads, - warmReads, - sm, - r, - ts, - nextTime, - next.WarpMessage != nil && warpErr == nil, - ) - if err != nil { - // Returning an error here should be avoided at all costs (can be a DoS). Rather, - // all units for the transaction should be consumed and a fee should be charged. - log.Warn("unexpected post-execution error", zap.Error(err)) - restorable = append(restorable, next) - execErr = err - continue - } - // Update block with new transaction - b.Txs = append(b.Txs, next) - usedKeys.Add(stateKeys.List()...) - if err := feeManager.Consume(result.Consumed); err != nil { - execErr = err - continue - } - results = append(results, result) - if next.WarpMessage != nil { - if warpErr == nil { - // Add a bit if the warp message was verified - b.WarpResults.Add(uint(warpCount)) + // Need to atomically check there aren't too many warp messages and add to block + blockLock.Lock() + defer blockLock.Unlock() + + // Ensure block isn't too big + if ok, dimension := feeManager.Consume(result.Consumed, maxUnits); !ok { + log.Debug( + "skipping tx: too many units", + zap.Int("dimension", int(dimension)), + zap.Uint64("tx", result.Consumed[dimension]), + zap.Uint64("block units", feeManager.LastConsumed(dimension)), + zap.Uint64("max block units", maxUnits[dimension]), + ) + restore = true + + // If we are above the target for the dimension we can't consume, we will + // stop building. This prevents a full mempool iteration looking for the + // "perfect fit". + if feeManager.LastConsumed(dimension) >= targetUnits[dimension] { + return errBlockFull + } } - warpCount++ - } + + // Update block with new transaction + tsv.Commit() + b.Txs = append(b.Txs, tx) + results = append(results, result) + usedKeys.Add(stateKeys.List()...) + if tx.WarpMessage != nil { + if warpErr == nil { + // Add a bit if the warp message was verified + b.WarpResults.Add(warpAdded) + } + warpAdded++ + } + return nil + }) } + execErr := e.Wait() executeSpan.End() // Handle execution result if execErr != nil { - if stopIndex >= 0 { - // If we stopped prefetching, make sure to add those txs back - restorable = append(restorable, txs[stopIndex:]...) + for _, tx := range pending { + // If we stopped executing, make sure to add those txs back + restorable = append(restorable, tx) } if !errors.Is(execErr, errBlockFull) { // Wait for stream preparation to finish to make @@ -460,20 +454,21 @@ func BuildBlock( timestampKey := TimestampKey(b.vm.StateManager().TimestampKey()) timestampKeyStr := string(timestampKey) feeKeyStr := string(feeKey) - ts.SetScope(ctx, set.Of(heightKeyStr, timestampKeyStr, feeKeyStr), map[string][]byte{ + tsv := ts.NewView(set.Of(heightKeyStr, timestampKeyStr, feeKeyStr), map[string][]byte{ heightKeyStr: binary.BigEndian.AppendUint64(nil, parent.Hght), timestampKeyStr: binary.BigEndian.AppendUint64(nil, uint64(parent.Tmstmp)), feeKeyStr: parentFeeManager.Bytes(), }) - if err := ts.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil { + if err := tsv.Insert(ctx, heightKey, binary.BigEndian.AppendUint64(nil, b.Hght)); err != nil { return nil, fmt.Errorf("%w: unable to insert height", err) } - if err := ts.Insert(ctx, timestampKey, binary.BigEndian.AppendUint64(nil, uint64(b.Tmstmp))); err != nil { + if err := tsv.Insert(ctx, timestampKey, binary.BigEndian.AppendUint64(nil, uint64(b.Tmstmp))); err != nil { return nil, fmt.Errorf("%w: unable to insert timestamp", err) } - if err := ts.Insert(ctx, feeKey, feeManager.Bytes()); err != nil { + if err := tsv.Insert(ctx, feeKey, feeManager.Bytes()); err != nil { return nil, fmt.Errorf("%w: unable to insert fees", err) } + tsv.Commit() // Fetch [parentView] root as late as possible to allow // for async processing to complete @@ -484,13 +479,14 @@ func BuildBlock( b.StateRoot = root // Get view from [tstate] after writing all changed keys - view, err := ts.CreateView(ctx, parentView, vm.Tracer()) + view, err := ts.ExportMerkleDBView(ctx, vm.Tracer(), parentView) if err != nil { return nil, err } // Compute block hash and marshaled representation if err := b.initializeBuilt(ctx, view, results, feeManager); err != nil { + log.Warn("block failed", zap.Int("txs", len(b.Txs)), zap.Any("consumed", feeManager.UnitsConsumed())) return nil, err } diff --git a/chain/dependencies.go b/chain/dependencies.go index 8373f3717d..4175b761d1 100644 --- a/chain/dependencies.go +++ b/chain/dependencies.go @@ -17,6 +17,7 @@ import ( "github.com/ava-labs/avalanchego/x/merkledb" "github.com/ava-labs/hypersdk/codec" + "github.com/ava-labs/hypersdk/executor" "github.com/ava-labs/hypersdk/state" "github.com/ava-labs/hypersdk/workers" ) @@ -56,6 +57,7 @@ type VM interface { Mempool() Mempool IsRepeat(context.Context, []*Transaction, set.Bits, bool) set.Bits GetTargetBuildDuration() time.Duration + GetTransactionExecutionCores() int Verified(context.Context, *StatelessBlock) Rejected(context.Context, *StatelessBlock) @@ -81,6 +83,8 @@ type VM interface { RecordBuildCapped() RecordEmptyBlockBuilt() RecordClearedMempool() + GetExecutorBuildRecorder() executor.Metrics + GetExecutorVerifyRecorder() executor.Metrics } type VerifyContext interface { diff --git a/chain/fee_manager.go b/chain/fee_manager.go index 8f31f2f91f..0317dfc726 100644 --- a/chain/fee_manager.go +++ b/chain/fee_manager.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "fmt" "strconv" + "sync" "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/hypersdk/consts" @@ -31,7 +32,9 @@ type ( Dimensions [FeeDimensions]uint64 ) +// FeeManager is safe for concurrent use type FeeManager struct { + l sync.RWMutex raw []byte } @@ -39,25 +42,49 @@ func NewFeeManager(raw []byte) *FeeManager { if len(raw) == 0 { raw = make([]byte, FeeDimensions*dimensionStateLen) } - return &FeeManager{raw} + return &FeeManager{raw: raw} } func (f *FeeManager) UnitPrice(d Dimension) uint64 { + f.l.RLock() + defer f.l.RUnlock() + + return f.unitPrice(d) +} + +func (f *FeeManager) unitPrice(d Dimension) uint64 { start := dimensionStateLen * d return binary.BigEndian.Uint64(f.raw[start : start+consts.Uint64Len]) } func (f *FeeManager) Window(d Dimension) window.Window { + f.l.RLock() + defer f.l.RUnlock() + + return f.window(d) +} + +func (f *FeeManager) window(d Dimension) window.Window { start := dimensionStateLen*d + consts.Uint64Len return window.Window(f.raw[start : start+window.WindowSliceSize]) } func (f *FeeManager) LastConsumed(d Dimension) uint64 { + f.l.RLock() + defer f.l.RUnlock() + + return f.lastConsumed(d) +} + +func (f *FeeManager) lastConsumed(d Dimension) uint64 { start := dimensionStateLen*d + consts.Uint64Len + window.WindowSliceSize return binary.BigEndian.Uint64(f.raw[start : start+consts.Uint64Len]) } func (f *FeeManager) ComputeNext(lastTime int64, currTime int64, r Rules) (*FeeManager, error) { + f.l.RLock() + defer f.l.RUnlock() + targetUnits := r.GetWindowTargetUnits() unitPriceChangeDenom := r.GetUnitPriceChangeDenominator() minUnitPrice := r.GetMinUnitPrice() @@ -85,18 +112,36 @@ func (f *FeeManager) ComputeNext(lastTime int64, currTime int64, r Rules) (*FeeM } func (f *FeeManager) SetUnitPrice(d Dimension, price uint64) { + f.l.Lock() + defer f.l.Unlock() + + f.setUnitPrice(d, price) +} + +func (f *FeeManager) setUnitPrice(d Dimension, price uint64) { start := dimensionStateLen * d binary.BigEndian.PutUint64(f.raw[start:start+consts.Uint64Len], price) } func (f *FeeManager) SetLastConsumed(d Dimension, consumed uint64) { + f.l.Lock() + defer f.l.Unlock() + + f.setLastConsumed(d, consumed) +} + +func (f *FeeManager) setLastConsumed(d Dimension, consumed uint64) { start := dimensionStateLen*d + consts.Uint64Len + window.WindowSliceSize binary.BigEndian.PutUint64(f.raw[start:start+consts.Uint64Len], consumed) } -func (f *FeeManager) CanConsume(d Dimensions, l Dimensions) (bool, Dimension) { +func (f *FeeManager) Consume(d Dimensions, l Dimensions) (bool, Dimension) { + f.l.Lock() + defer f.l.Unlock() + + // Ensure we can consume (don't want partial update of values) for i := Dimension(0); i < FeeDimensions; i++ { - consumed, err := math.Add64(f.LastConsumed(i), d[i]) + consumed, err := math.Add64(f.lastConsumed(i), d[i]) if err != nil { return false, i } @@ -104,28 +149,32 @@ func (f *FeeManager) CanConsume(d Dimensions, l Dimensions) (bool, Dimension) { return false, i } } - return true, -1 -} -func (f *FeeManager) Consume(d Dimensions) error { + // Commit to consumption for i := Dimension(0); i < FeeDimensions; i++ { - consumed, err := math.Add64(f.LastConsumed(i), d[i]) + consumed, err := math.Add64(f.lastConsumed(i), d[i]) if err != nil { - return err + return false, i } - f.SetLastConsumed(i, consumed) + f.setLastConsumed(i, consumed) } - return nil + return true, 0 } func (f *FeeManager) Bytes() []byte { + f.l.RLock() + defer f.l.RUnlock() + return f.raw } func (f *FeeManager) MaxFee(d Dimensions) (uint64, error) { + f.l.RLock() + defer f.l.RUnlock() + fee := uint64(0) for i := Dimension(0); i < FeeDimensions; i++ { - contribution, err := math.Mul64(f.UnitPrice(i), d[i]) + contribution, err := math.Mul64(f.unitPrice(i), d[i]) if err != nil { return 0, err } @@ -139,17 +188,23 @@ func (f *FeeManager) MaxFee(d Dimensions) (uint64, error) { } func (f *FeeManager) UnitPrices() Dimensions { + f.l.RLock() + defer f.l.RUnlock() + var d Dimensions for i := Dimension(0); i < FeeDimensions; i++ { - d[i] = f.UnitPrice(i) + d[i] = f.unitPrice(i) } return d } func (f *FeeManager) UnitsConsumed() Dimensions { + f.l.RLock() + defer f.l.RUnlock() + var d Dimensions for i := Dimension(0); i < FeeDimensions; i++ { - d[i] = f.LastConsumed(i) + d[i] = f.lastConsumed(i) } return d } diff --git a/chain/processor.go b/chain/processor.go index 2fa4ad8e62..c8595a7723 100644 --- a/chain/processor.go +++ b/chain/processor.go @@ -7,10 +7,12 @@ import ( "context" "errors" "fmt" + "sync" "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/trace" + "github.com/ava-labs/hypersdk/executor" "github.com/ava-labs/hypersdk/keys" "github.com/ava-labs/hypersdk/state" "github.com/ava-labs/hypersdk/tstate" @@ -23,157 +25,136 @@ type fetchData struct { chunks uint16 } -type txData struct { - tx *Transaction - storage map[string][]byte - - coldReads map[string]uint16 - warmReads map[string]uint16 -} - -type Processor struct { - tracer trace.Tracer - - err error - blk *StatelessBlock - readyTxs chan *txData - im state.Immutable -} +func (b *StatelessBlock) Execute( + ctx context.Context, + tracer trace.Tracer, //nolint:interfacer + im state.Immutable, + feeManager *FeeManager, + r Rules, +) ([]*Result, *tstate.TState, error) { + ctx, span := tracer.Start(ctx, "Processor.Execute") + defer span.End() -// Only prepare for population if above last accepted height -func NewProcessor(tracer trace.Tracer, b *StatelessBlock) *Processor { - return &Processor{ - tracer: tracer, + var ( + sm = b.vm.StateManager() + numTxs = len(b.Txs) + t = b.GetTimestamp() + cacheLock sync.RWMutex + cache = make(map[string]*fetchData, numTxs) + + e = executor.New(numTxs, b.vm.GetTransactionExecutionCores(), b.vm.GetExecutorVerifyRecorder()) + ts = tstate.New(numTxs * 2) // TODO: tune this heuristic + results = make([]*Result, numTxs) + ) - blk: b, - readyTxs: make(chan *txData, len(b.GetTxs())), - } -} + // Fetch required keys and execute transactions + for li, ltx := range b.Txs { + i := li + tx := ltx -func (p *Processor) Prefetch(ctx context.Context, im state.Immutable) { - ctx, span := p.tracer.Start(ctx, "Processor.Prefetch") - p.im = im - sm := p.blk.vm.StateManager() - go func() { - defer func() { - close(p.readyTxs) // let caller know all sets have been readied - span.End() - }() - - // Store required keys for each set - alreadyFetched := make(map[string]*fetchData, len(p.blk.GetTxs())) - for _, tx := range p.blk.GetTxs() { - coldReads := map[string]uint16{} - warmReads := map[string]uint16{} - storage := map[string][]byte{} - stateKeys, err := tx.StateKeys(sm) - if err != nil { - p.err = err - return - } + stateKeys, err := tx.StateKeys(sm) + if err != nil { + e.Stop() + return nil, nil, err + } + e.Run(stateKeys, func() error { + // Fetch keys from cache + var ( + coldReads = make(map[string]uint16, len(stateKeys)) + warmReads = make(map[string]uint16, len(stateKeys)) + storage = make(map[string][]byte, len(stateKeys)) + toLookup = make([]string, 0, len(stateKeys)) + ) + cacheLock.RLock() for k := range stateKeys { - if v, ok := alreadyFetched[k]; ok { + if v, ok := cache[k]; ok { warmReads[k] = v.chunks if v.exists { storage[k] = v.v } continue } - v, err := im.GetValue(ctx, []byte(k)) - if errors.Is(err, database.ErrNotFound) { - coldReads[k] = 0 - alreadyFetched[k] = &fetchData{nil, false, 0} - continue - } else if err != nil { - p.err = err - return - } - // We verify that the [NumChunks] is already less than the number - // added on the write path, so we don't need to do so again here. - numChunks, ok := keys.NumChunks(v) - if !ok { - p.err = ErrInvalidKeyValue - return + toLookup = append(toLookup, k) + } + cacheLock.RUnlock() + + // Fetch keys from disk + var toCache map[string]*fetchData + if len(toLookup) > 0 { + toCache = make(map[string]*fetchData, len(toLookup)) + for _, k := range toLookup { + v, err := im.GetValue(ctx, []byte(k)) + if errors.Is(err, database.ErrNotFound) { + coldReads[k] = 0 + toCache[k] = &fetchData{nil, false, 0} + continue + } else if err != nil { + return err + } + // We verify that the [NumChunks] is already less than the number + // added on the write path, so we don't need to do so again here. + numChunks, ok := keys.NumChunks(v) + if !ok { + return ErrInvalidKeyValue + } + coldReads[k] = numChunks + toCache[k] = &fetchData{v, true, numChunks} + storage[k] = v } - coldReads[k] = numChunks - alreadyFetched[k] = &fetchData{v, true, numChunks} - storage[k] = v } - p.readyTxs <- &txData{tx, storage, coldReads, warmReads} - } - }() -} -func (p *Processor) Execute( - ctx context.Context, - feeManager *FeeManager, - r Rules, -) ([]*Result, *tstate.TState, error) { - ctx, span := p.tracer.Start(ctx, "Processor.Execute") - defer span.End() + // Execute transaction + // + // It is critical we explicitly set the scope before each transaction is + // processed + tsv := ts.NewView(stateKeys, storage) - var ( - ts = tstate.New(len(p.blk.Txs) * 2) // TODO: tune this heuristic - t = p.blk.GetTimestamp() - results = []*Result{} - sm = p.blk.vm.StateManager() - ) - for txData := range p.readyTxs { - if p.err != nil { - return nil, nil, p.err - } + // Ensure we have enough funds to pay fees + authCUs, err := tx.PreExecute(ctx, feeManager, sm, r, tsv, t) + if err != nil { + return err + } - tx := txData.tx + // Wait to execute transaction until we have the warp result processed. + var warpVerified bool + warpMsg, ok := b.warpMessages[tx.ID()] + if ok { + select { + case warpVerified = <-warpMsg.verifiedChan: + case <-ctx.Done(): + return ctx.Err() + } + } + result, err := tx.Execute(ctx, feeManager, authCUs, coldReads, warmReads, sm, r, tsv, t, ok && warpVerified) + if err != nil { + return err + } + results[i] = result - // Ensure can process next tx - nextUnits, err := tx.MaxUnits(sm, r) - if err != nil { - return nil, nil, err - } - if ok, dimension := feeManager.CanConsume(nextUnits, r.GetMaxBlockUnits()); !ok { - return nil, nil, fmt.Errorf("dimension %d exceeds limit", dimension) - } + // Update block metadata with units actually consumed (if more is consumed than block allows, we will non-deterministically + // exit with an error based on which tx over the limit is processed first) + if ok, d := feeManager.Consume(result.Consumed, r.GetMaxBlockUnits()); !ok { + return fmt.Errorf("%w: %d too large", ErrInvalidUnitsConsumed, d) + } - // It is critical we explicitly set the scope before each transaction is - // processed - stateKeys, err := tx.StateKeys(sm) - if err != nil { - return nil, nil, err - } - ts.SetScope(ctx, stateKeys, txData.storage) + // Commit results to parent [TState] + tsv.Commit() - // Execute tx - authCUs, err := tx.PreExecute(ctx, feeManager, sm, r, ts, t) - if err != nil { - return nil, nil, err - } - // Wait to execute transaction until we have the warp result processed. - // - // TODO: parallel execution will greatly improve performance when actions - // start taking longer than a few ns (i.e. with hypersdk programs). - var warpVerified bool - warpMsg, ok := p.blk.warpMessages[tx.ID()] - if ok { - select { - case warpVerified = <-warpMsg.verifiedChan: - case <-ctx.Done(): - return nil, nil, ctx.Err() + // Update key cache + if len(toCache) > 0 { + cacheLock.Lock() + for k := range toCache { + cache[k] = toCache[k] + } + cacheLock.Unlock() } - } - result, err := tx.Execute(ctx, feeManager, authCUs, txData.coldReads, txData.warmReads, sm, r, ts, t, ok && warpVerified) - if err != nil { - return nil, nil, err - } - results = append(results, result) - - // Update block metadata with units actually consumed - if err := feeManager.Consume(result.Consumed); err != nil { - return nil, nil, err - } + return nil + }) } - // Wait until end to write changes to avoid conflicting with pre-fetching - if p.err != nil { - return nil, nil, p.err + if err := e.Wait(); err != nil { + return nil, nil, err } + + // Return tstate that can be used to add block-level keys to state return results, ts, nil } diff --git a/chain/transaction.go b/chain/transaction.go index d029a8c89a..d3ad0244ad 100644 --- a/chain/transaction.go +++ b/chain/transaction.go @@ -346,7 +346,7 @@ func (t *Transaction) Execute( warmStorageReads map[string]uint16, s StateManager, r Rules, - ts *tstate.TState, + ts *tstate.TStateView, timestamp int64, warpVerified bool, ) (*Result, error) { diff --git a/config/config.go b/config/config.go index cc21364407..c14499b65e 100644 --- a/config/config.go +++ b/config/config.go @@ -5,7 +5,6 @@ package config import ( - "runtime" "time" "github.com/ava-labs/avalanchego/utils/logging" @@ -14,18 +13,12 @@ import ( "github.com/ava-labs/hypersdk/trace" ) -const avalancheGoMinCPU = 4 - type Config struct{} -func (c *Config) GetLogLevel() logging.Level { return logging.Info } -func (c *Config) GetParallelism() int { - numCPUs := runtime.NumCPU() - if numCPUs > avalancheGoMinCPU { - return numCPUs - avalancheGoMinCPU - } - return 1 -} +func (c *Config) GetLogLevel() logging.Level { return logging.Info } +func (c *Config) GetSignatureVerificationCores() int { return 1 } +func (c *Config) GetRootGenerationCores() int { return 1 } +func (c *Config) GetTransactionExecutionCores() int { return 1 } func (c *Config) GetMempoolSize() int { return 2_048 } func (c *Config) GetMempoolPayerSize() int { return 32 } func (c *Config) GetMempoolExemptPayers() [][]byte { return nil } diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go index e16abaed3f..40850af663 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go @@ -152,6 +152,12 @@ var generatePrometheusCmd = &cobra.Command{ panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_state_merkleDB_value_node_cache_hit[5s])/(increase(avalanche_%s_vm_state_merkleDB_value_node_cache_miss[5s]) + increase(avalanche_%s_vm_state_merkleDB_value_node_cache_hit[5s]))", chainID, chainID, chainID)) utils.Outf("{{yellow}}value node cache hit rate:{{/}} %s\n", panels[len(panels)-1]) + panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_executor_build_executable[5s]) / (increase(avalanche_%s_vm_hypersdk_chain_executor_build_blocked[5s]) + increase(avalanche_%s_vm_hypersdk_chain_executor_build_executable[5s]))", chainID, chainID, chainID)) + utils.Outf("{{yellow}}build txs executable (%%) per second:{{/}} %s\n", panels[len(panels)-1]) + + panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_executor_verify_executable[5s]) / (increase(avalanche_%s_vm_hypersdk_chain_executor_verify_blocked[5s]) + increase(avalanche_%s_vm_hypersdk_chain_executor_verify_executable[5s]))", chainID, chainID, chainID)) + utils.Outf("{{yellow}}verify txs executable (%%) per second:{{/}} %s\n", panels[len(panels)-1]) + return panels }) }, diff --git a/examples/morpheusvm/config/config.go b/examples/morpheusvm/config/config.go index 23ad2c4730..7cbdac3abf 100644 --- a/examples/morpheusvm/config/config.go +++ b/examples/morpheusvm/config/config.go @@ -32,6 +32,11 @@ const ( type Config struct { *config.Config + // Concurrency + SignatureVerificationCores int `json:"signatureVerificationCores"` + RootGenerationCores int `json:"rootGenerationCores"` + TransactionExecutionCores int `json:"transactionExecutionCores"` + // Tracing TraceEnabled bool `json:"traceEnabled"` TraceSampleRate float64 `json:"traceSampleRate"` @@ -52,7 +57,6 @@ type Config struct { StoreTransactions bool `json:"storeTransactions"` TestMode bool `json:"testMode"` // makes gossip/building manual LogLevel logging.Level `json:"logLevel"` - Parallelism int `json:"parallelism"` // State Sync StateSyncServerDelay time.Duration `json:"stateSyncServerDelay"` // for testing @@ -87,7 +91,9 @@ func New(nodeID ids.NodeID, b []byte) (*Config, error) { func (c *Config) setDefault() { c.LogLevel = c.Config.GetLogLevel() - c.Parallelism = c.Config.GetParallelism() + c.SignatureVerificationCores = c.Config.GetSignatureVerificationCores() + c.RootGenerationCores = c.Config.GetRootGenerationCores() + c.TransactionExecutionCores = c.Config.GetTransactionExecutionCores() c.MempoolSize = c.Config.GetMempoolSize() c.MempoolPayerSize = c.Config.GetMempoolPayerSize() c.StateSyncServerDelay = c.Config.GetStateSyncServerDelay() @@ -96,12 +102,14 @@ func (c *Config) setDefault() { c.StoreTransactions = defaultStoreTransactions } -func (c *Config) GetLogLevel() logging.Level { return c.LogLevel } -func (c *Config) GetTestMode() bool { return c.TestMode } -func (c *Config) GetParallelism() int { return c.Parallelism } -func (c *Config) GetMempoolSize() int { return c.MempoolSize } -func (c *Config) GetMempoolPayerSize() int { return c.MempoolPayerSize } -func (c *Config) GetMempoolExemptPayers() [][]byte { return c.parsedExemptPayers } +func (c *Config) GetLogLevel() logging.Level { return c.LogLevel } +func (c *Config) GetTestMode() bool { return c.TestMode } +func (c *Config) GetSignatureVerificationCores() int { return c.SignatureVerificationCores } +func (c *Config) GetRootGenerationCores() int { return c.RootGenerationCores } +func (c *Config) GetTransactionExecutionCores() int { return c.TransactionExecutionCores } +func (c *Config) GetMempoolSize() int { return c.MempoolSize } +func (c *Config) GetMempoolPayerSize() int { return c.MempoolPayerSize } +func (c *Config) GetMempoolExemptPayers() [][]byte { return c.parsedExemptPayers } func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{ Enabled: c.TraceEnabled, diff --git a/examples/morpheusvm/scripts/run.sh b/examples/morpheusvm/scripts/run.sh index d7a7352a6a..d3cde251a7 100755 --- a/examples/morpheusvm/scripts/run.sh +++ b/examples/morpheusvm/scripts/run.sh @@ -145,7 +145,9 @@ cat < ${TMPDIR}/morpheusvm.config "mempoolSize": 10000000, "mempoolPayerSize": 10000000, "mempoolExemptPayers":["morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp"], - "parallelism": 5, + "signatureVerificationCores": 2, + "rootGenerationCores": 2, + "transactionExecutionCores": 2, "verifySignatures":true, "storeTransactions": ${STORE_TXS}, "streamingBacklogSize": 10000000, diff --git a/examples/morpheusvm/tests/load/load_test.go b/examples/morpheusvm/tests/load/load_test.go index 38bb38f459..e0d350606e 100644 --- a/examples/morpheusvm/tests/load/load_test.go +++ b/examples/morpheusvm/tests/load/load_test.go @@ -272,9 +272,11 @@ var _ = ginkgo.BeforeSuite(func() { nil, []byte( fmt.Sprintf( - `{%s"parallelism":%d, "mempoolSize":%d, "mempoolPayerSize":%d, "verifySignatures":%t, "testMode":true}`, + `{%s"signatureVerificationCores":%d, "rootGenerationCores":%d, "transactionExecutionCores":%d, "mempoolSize":%d, "mempoolPayerSize":%d, "verifySignatures":%t, "testMode":true}`, tracePrefix, - numWorkers, + numWorkers/3, + numWorkers/3, + numWorkers/3, txs, txs, verifySignatures, diff --git a/examples/tokenvm/cmd/token-cli/cmd/prometheus.go b/examples/tokenvm/cmd/token-cli/cmd/prometheus.go index cc75a8f063..f2ac1d94c9 100644 --- a/examples/tokenvm/cmd/token-cli/cmd/prometheus.go +++ b/examples/tokenvm/cmd/token-cli/cmd/prometheus.go @@ -145,6 +145,12 @@ var generatePrometheusCmd = &cobra.Command{ panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_state_merkleDB_value_node_cache_hit[5s])/(increase(avalanche_%s_vm_state_merkleDB_value_node_cache_miss[5s]) + increase(avalanche_%s_vm_state_merkleDB_value_node_cache_hit[5s]))", chainID, chainID, chainID)) utils.Outf("{{yellow}}value node cache hit rate:{{/}} %s\n", panels[len(panels)-1]) + panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_executor_build_executable[5s]) / (increase(avalanche_%s_vm_hypersdk_chain_executor_build_blocked[5s]) + increase(avalanche_%s_vm_hypersdk_chain_executor_build_executable[5s]))", chainID, chainID, chainID)) + utils.Outf("{{yellow}}build txs executable (%%) per second:{{/}} %s\n", panels[len(panels)-1]) + + panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_executor_verify_executable[5s]) / (increase(avalanche_%s_vm_hypersdk_chain_executor_verify_blocked[5s]) + increase(avalanche_%s_vm_hypersdk_chain_executor_verify_executable[5s]))", chainID, chainID, chainID)) + utils.Outf("{{yellow}}verify txs executable (%%) per second:{{/}} %s\n", panels[len(panels)-1]) + return panels }) }, diff --git a/examples/tokenvm/config/config.go b/examples/tokenvm/config/config.go index 0db284c87e..92098a9cc0 100644 --- a/examples/tokenvm/config/config.go +++ b/examples/tokenvm/config/config.go @@ -34,6 +34,11 @@ const ( type Config struct { *config.Config + // Concurrency + SignatureVerificationCores int `json:"signatureVerificationCores"` + RootGenerationCores int `json:"rootGenerationCores"` + TransactionExecutionCores int `json:"transactionExecutionCores"` + // Gossip GossipMaxSize int `json:"gossipMaxSize"` GossipProposerDiff int `json:"gossipProposerDiff"` @@ -67,7 +72,6 @@ type Config struct { StoreTransactions bool `json:"storeTransactions"` TestMode bool `json:"testMode"` // makes gossip/building manual LogLevel logging.Level `json:"logLevel"` - Parallelism int `json:"parallelism"` // State Sync StateSyncServerDelay time.Duration `json:"stateSyncServerDelay"` // for testing @@ -108,7 +112,9 @@ func (c *Config) setDefault() { c.GossipProposerDepth = gcfg.GossipProposerDepth c.NoGossipBuilderDiff = gcfg.NoGossipBuilderDiff c.VerifyTimeout = gcfg.VerifyTimeout - c.Parallelism = c.Config.GetParallelism() + c.SignatureVerificationCores = c.Config.GetSignatureVerificationCores() + c.RootGenerationCores = c.Config.GetRootGenerationCores() + c.TransactionExecutionCores = c.Config.GetTransactionExecutionCores() c.MempoolSize = c.Config.GetMempoolSize() c.MempoolPayerSize = c.Config.GetMempoolPayerSize() c.StateSyncServerDelay = c.Config.GetStateSyncServerDelay() @@ -118,12 +124,14 @@ func (c *Config) setDefault() { c.MaxOrdersPerPair = defaultMaxOrdersPerPair } -func (c *Config) GetLogLevel() logging.Level { return c.LogLevel } -func (c *Config) GetTestMode() bool { return c.TestMode } -func (c *Config) GetParallelism() int { return c.Parallelism } -func (c *Config) GetMempoolSize() int { return c.MempoolSize } -func (c *Config) GetMempoolPayerSize() int { return c.MempoolPayerSize } -func (c *Config) GetMempoolExemptPayers() [][]byte { return c.parsedExemptPayers } +func (c *Config) GetLogLevel() logging.Level { return c.LogLevel } +func (c *Config) GetTestMode() bool { return c.TestMode } +func (c *Config) GetSignatureVerificationCores() int { return c.SignatureVerificationCores } +func (c *Config) GetRootGenerationCores() int { return c.RootGenerationCores } +func (c *Config) GetTransactionExecutionCores() int { return c.TransactionExecutionCores } +func (c *Config) GetMempoolSize() int { return c.MempoolSize } +func (c *Config) GetMempoolPayerSize() int { return c.MempoolPayerSize } +func (c *Config) GetMempoolExemptPayers() [][]byte { return c.parsedExemptPayers } func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{ Enabled: c.TraceEnabled, diff --git a/examples/tokenvm/scripts/deploy.devnet.sh b/examples/tokenvm/scripts/deploy.devnet.sh index bb301bc075..ed44551af6 100755 --- a/examples/tokenvm/scripts/deploy.devnet.sh +++ b/examples/tokenvm/scripts/deploy.devnet.sh @@ -137,6 +137,9 @@ cat < ${DEPLOY_ARTIFACT_PREFIX}/tokenvm-chain-config.json "mempoolPayerSize": 10000000, "mempoolExemptPayers":["token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp"], "streamingBacklogSize": 10000000, + "signatureVerificationCores": 4, + "rootGenerationCores": 4, + "transactionExecutionCores": 4, "storeTransactions": false, "verifySignatures": true, "trackedPairs":["*"], diff --git a/examples/tokenvm/scripts/run.sh b/examples/tokenvm/scripts/run.sh index 22d2c8beec..9bc09f8ebf 100755 --- a/examples/tokenvm/scripts/run.sh +++ b/examples/tokenvm/scripts/run.sh @@ -151,7 +151,9 @@ cat < ${TMPDIR}/tokenvm.config "mempoolSize": 10000000, "mempoolPayerSize": 10000000, "mempoolExemptPayers":["token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp"], - "parallelism": 5, + "signatureVerificationCores": 2, + "rootGenerationCores": 2, + "transactionExecutionCores": 2, "verifySignatures": true, "storeTransactions": ${STORE_TXS}, "streamingBacklogSize": 10000000, diff --git a/examples/tokenvm/tests/load/load_test.go b/examples/tokenvm/tests/load/load_test.go index fa52068adb..b77d0b09ec 100644 --- a/examples/tokenvm/tests/load/load_test.go +++ b/examples/tokenvm/tests/load/load_test.go @@ -265,9 +265,11 @@ var _ = ginkgo.BeforeSuite(func() { nil, []byte( fmt.Sprintf( - `{%s"parallelism":%d, "mempoolSize":%d, "mempoolPayerSize":%d, "testMode":true}`, + `{%s"signatureVerificationCores":%d, "rootGenerationCores":%d, "transactionExecutionCores":%d, "mempoolSize":%d, "mempoolPayerSize":%d, "testMode":true}`, tracePrefix, - numWorkers, + numWorkers/3, + numWorkers/3, + numWorkers/3, txs, txs, ), diff --git a/executor/dependencies.go b/executor/dependencies.go new file mode 100644 index 0000000000..883515b8ed --- /dev/null +++ b/executor/dependencies.go @@ -0,0 +1,9 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package executor + +type Metrics interface { + RecordBlocked() + RecordExecutable() +} diff --git a/executor/errors.go b/executor/errors.go new file mode 100644 index 0000000000..d679ced2f1 --- /dev/null +++ b/executor/errors.go @@ -0,0 +1,8 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package executor + +import "errors" + +var ErrStopped = errors.New("stopped") diff --git a/executor/executor.go b/executor/executor.go index 67dd7c102e..0770e7b784 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -18,9 +18,14 @@ const defaultSetSize = 8 // are executed in the order they were queued. // Tasks with no conflicts are executed immediately. type Executor struct { + metrics Metrics wg sync.WaitGroup executable chan *task + stop chan struct{} + err error + stopOnce sync.Once + l sync.Mutex done bool completed int @@ -29,11 +34,13 @@ type Executor struct { } // New creates a new [Executor]. -func New(items, concurrency int) *Executor { +func New(items, concurrency int, metrics Metrics) *Executor { e := &Executor{ - tasks: map[int]*task{}, - edges: map[string]int{}, - executable: make(chan *task, items), // ensure we don't block while holding lock + metrics: metrics, + stop: make(chan struct{}), + tasks: make(map[int]*task, items), + edges: make(map[string]int, items*2), // TODO: tune this + executable: make(chan *task, items), // ensure we don't block while holding lock } for i := 0; i < concurrency; i++ { e.createWorker() @@ -43,7 +50,7 @@ func New(items, concurrency int) *Executor { type task struct { id int - f func() + f func() error dependencies set.Set[int] blocking set.Set[int] @@ -57,34 +64,48 @@ func (e *Executor) createWorker() { go func() { defer e.wg.Done() - for t := range e.executable { - t.f() + for { + select { + case t, ok := <-e.executable: + if !ok { + return + } + if err := t.f(); err != nil { + e.stopOnce.Do(func() { + e.err = err + close(e.stop) + }) + return + } - e.l.Lock() - for b := range t.blocking { // works fine on non-initialized map - bt := e.tasks[b] - bt.dependencies.Remove(t.id) - if bt.dependencies.Len() == 0 { // must be non-nil to be blocked - bt.dependencies = nil // free memory - e.executable <- bt + e.l.Lock() + for b := range t.blocking { // works fine on non-initialized map + bt := e.tasks[b] + bt.dependencies.Remove(t.id) + if bt.dependencies.Len() == 0 { // must be non-nil to be blocked + bt.dependencies = nil // free memory + e.executable <- bt + } } + t.blocking = nil // free memory + t.executed = true + e.completed++ + if e.done && e.completed == len(e.tasks) { + // We will close here if there are unexecuted tasks + // when we call [Wait]. + close(e.executable) + } + e.l.Unlock() + case <-e.stop: + return } - t.blocking = nil // free memory - t.executed = true - e.completed++ - if e.done && e.completed == len(e.tasks) { - // We will close here if there are unexecuted tasks - // when we call [Wait]. - close(e.executable) - } - e.l.Unlock() } }() } // Run executes [f] after all previously enqueued [f] with // overlapping [conflicts] are executed. -func (e *Executor) Run(conflicts set.Set[string], f func()) { +func (e *Executor) Run(conflicts set.Set[string], f func() error) { e.l.Lock() defer e.l.Unlock() @@ -119,13 +140,27 @@ func (e *Executor) Run(conflicts set.Set[string], f func()) { if t.dependencies == nil || t.dependencies.Len() == 0 { t.dependencies = nil // free memory e.executable <- t + if e.metrics != nil { + e.metrics.RecordExecutable() + } + return + } + if e.metrics != nil { + e.metrics.RecordBlocked() } } +func (e *Executor) Stop() { + e.stopOnce.Do(func() { + e.err = ErrStopped + close(e.stop) + }) +} + // Wait returns as soon as all enqueued [f] are executed. // // You should not call [Run] after [Wait] is called. -func (e *Executor) Wait() { +func (e *Executor) Wait() error { e.l.Lock() e.done = true if e.completed == len(e.tasks) { @@ -135,4 +170,5 @@ func (e *Executor) Wait() { } e.l.Unlock() e.wg.Wait() + return e.err } diff --git a/executor/executor_test.go b/executor/executor_test.go index bd91114ee9..6725c41fda 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4,6 +4,7 @@ package executor import ( + "errors" "sync" "testing" "time" @@ -18,7 +19,7 @@ func TestExecutorNoConflicts(t *testing.T) { require = require.New(t) l sync.Mutex completed = make([]int, 0, 100) - e = New(100, 4) + e = New(100, 4, nil) canWait = make(chan struct{}) ) for i := 0; i < 100; i++ { @@ -27,17 +28,18 @@ func TestExecutorNoConflicts(t *testing.T) { s.Add(ids.GenerateTestID().String()) } ti := i - e.Run(s, func() { + e.Run(s, func() error { l.Lock() completed = append(completed, ti) if len(completed) == 100 { close(canWait) } l.Unlock() + return nil }) } <-canWait - e.Wait() // no task running + require.NoError(e.Wait()) // no task running require.Len(completed, 100) } @@ -46,7 +48,7 @@ func TestExecutorNoConflictsSlow(t *testing.T) { require = require.New(t) l sync.Mutex completed = make([]int, 0, 100) - e = New(100, 4) + e = New(100, 4, nil) ) for i := 0; i < 100; i++ { s := set.NewSet[string](i + 1) @@ -54,16 +56,17 @@ func TestExecutorNoConflictsSlow(t *testing.T) { s.Add(ids.GenerateTestID().String()) } ti := i - e.Run(s, func() { + e.Run(s, func() error { if ti == 0 { time.Sleep(3 * time.Second) } l.Lock() completed = append(completed, ti) l.Unlock() + return nil }) } - e.Wait() // existing task is running + require.NoError(e.Wait()) // existing task is running require.Len(completed, 100) require.Equal(0, completed[99]) } @@ -74,7 +77,7 @@ func TestExecutorSimpleConflict(t *testing.T) { conflictKey = ids.GenerateTestID().String() l sync.Mutex completed = make([]int, 0, 100) - e = New(100, 4) + e = New(100, 4, nil) ) for i := 0; i < 100; i++ { s := set.NewSet[string](i + 1) @@ -85,7 +88,7 @@ func TestExecutorSimpleConflict(t *testing.T) { s.Add(conflictKey) } ti := i - e.Run(s, func() { + e.Run(s, func() error { if ti == 0 { time.Sleep(3 * time.Second) } @@ -93,9 +96,10 @@ func TestExecutorSimpleConflict(t *testing.T) { l.Lock() completed = append(completed, ti) l.Unlock() + return nil }) } - e.Wait() + require.NoError(e.Wait()) require.Equal([]int{0, 10, 20, 30, 40, 50, 60, 70, 80, 90}, completed[90:]) } @@ -106,7 +110,7 @@ func TestExecutorMultiConflict(t *testing.T) { conflictKey2 = ids.GenerateTestID().String() l sync.Mutex completed = make([]int, 0, 100) - e = New(100, 4) + e = New(100, 4, nil) ) for i := 0; i < 100; i++ { s := set.NewSet[string](i + 1) @@ -120,7 +124,7 @@ func TestExecutorMultiConflict(t *testing.T) { s.Add(conflictKey2) } ti := i - e.Run(s, func() { + e.Run(s, func() error { if ti == 0 { time.Sleep(3 * time.Second) } @@ -131,8 +135,64 @@ func TestExecutorMultiConflict(t *testing.T) { l.Lock() completed = append(completed, ti) l.Unlock() + return nil }) } - e.Wait() + require.NoError(e.Wait()) require.Equal([]int{0, 10, 15, 20, 30, 40, 50, 60, 70, 80, 90}, completed[89:]) } + +func TestEarlyExit(t *testing.T) { + var ( + require = require.New(t) + l sync.Mutex + completed = make([]int, 0, 500) + e = New(500, 4, nil) + terr = errors.New("uh oh") + ) + for i := 0; i < 500; i++ { + s := set.NewSet[string](i + 1) + for k := 0; k < i+1; k++ { + s.Add(ids.GenerateTestID().String()) + } + ti := i + e.Run(s, func() error { + l.Lock() + completed = append(completed, ti) + l.Unlock() + if ti == 200 { + return terr + } + return nil + }) + } + require.True(len(completed) < 500) + require.ErrorIs(e.Wait(), terr) // no task running +} + +func TestStop(t *testing.T) { + var ( + require = require.New(t) + l sync.Mutex + completed = make([]int, 0, 500) + e = New(500, 4, nil) + ) + for i := 0; i < 500; i++ { + s := set.NewSet[string](i + 1) + for k := 0; k < i+1; k++ { + s.Add(ids.GenerateTestID().String()) + } + ti := i + e.Run(s, func() error { + l.Lock() + completed = append(completed, ti) + l.Unlock() + if ti == 200 { + e.Stop() + } + return nil + }) + } + require.True(len(completed) < 500) + require.ErrorIs(e.Wait(), ErrStopped) // no task running +} diff --git a/tstate/tstate.go b/tstate/tstate.go index 57d28f19b2..3aaa1d67a1 100644 --- a/tstate/tstate.go +++ b/tstate/tstate.go @@ -5,14 +5,11 @@ package tstate import ( "context" - "errors" + "sync" - "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils/maybe" - "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/x/merkledb" - "github.com/ava-labs/hypersdk/keys" "github.com/ava-labs/hypersdk/state" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" @@ -26,30 +23,11 @@ type op struct { pastChanged bool } -type cacheItem struct { - Value []byte - Exists bool -} - // TState defines a struct for storing temporary state. type TState struct { + l sync.RWMutex + ops int changedKeys map[string]maybe.Maybe[[]byte] - fetchCache map[string]*cacheItem // in case we evict and want to re-fetch - - // We don't differentiate between read and write scope. - scope set.Set[string] // stores a list of managed keys in the TState struct - scopeStorage map[string][]byte - - // Ops is a record of all operations performed on [TState]. Tracking - // operations allows for reverting state to a certain point-in-time. - ops []*op - - // Store which keys are modified and how large their values were. Reset - // whenever setting scope. - canCreate bool - creations map[string]uint16 - coldModifications map[string]uint16 - warmModifications map[string]uint16 } // New returns a new instance of TState. Initializes the storage and changedKeys @@ -57,245 +35,49 @@ type TState struct { func New(changedSize int) *TState { return &TState{ changedKeys: make(map[string]maybe.Maybe[[]byte], changedSize), - - fetchCache: map[string]*cacheItem{}, - - ops: make([]*op, 0, changedSize), - - canCreate: true, } } -// GetValue returns the value associated from tempStorage with the -// associated [key]. If [key] does not exist in readScope or if it is not found -// in storage an error is returned. -func (ts *TState) GetValue(ctx context.Context, key []byte) ([]byte, error) { - if !ts.checkScope(ctx, key) { - return nil, ErrKeyNotSpecified - } - k := string(key) - v, _, exists := ts.getValue(ctx, k) - if !exists { - return nil, database.ErrNotFound - } - return v, nil -} +func (ts *TState) getChangedValue(_ context.Context, key string) ([]byte, bool, bool) { + ts.l.RLock() + defer ts.l.RUnlock() -// Exists returns whether or not the associated [key] is present. -func (ts *TState) Exists(ctx context.Context, key []byte) (bool, bool, error) { - if !ts.checkScope(ctx, key) { - return false, false, ErrKeyNotSpecified - } - k := string(key) - _, changed, exists := ts.getValue(ctx, k) - return changed, exists, nil -} - -func (ts *TState) getValue(_ context.Context, key string) ([]byte, bool, bool) { if v, ok := ts.changedKeys[key]; ok { if v.IsNothing() { return nil, true, false } return v.Value(), true, true } - v, ok := ts.scopeStorage[key] - if !ok { - return nil, false, false - } - return v, false, true + return nil, false, false } -// FetchAndSetScope updates ts to include the [db] values associated with [keys]. -// FetchAndSetScope then sets the scope of ts to [keys]. If a key exists in -// ts.fetchCache set the key's value to the value from cache. -// -// If possible, this function should be avoided and state should be prefetched (much faster). -func (ts *TState) FetchAndSetScope(ctx context.Context, keys set.Set[string], im state.Immutable) error { - ts.scopeStorage = map[string][]byte{} - for key := range keys { - if val, ok := ts.fetchCache[key]; ok { - if val.Exists { - ts.scopeStorage[key] = val.Value - } - continue - } - v, err := im.GetValue(ctx, []byte(key)) - if errors.Is(err, database.ErrNotFound) { - ts.fetchCache[key] = &cacheItem{Exists: false} - continue - } - if err != nil { - return err - } - ts.fetchCache[key] = &cacheItem{Value: v, Exists: true} - ts.scopeStorage[key] = v - } - ts.scope = keys - ts.creations = map[string]uint16{} - ts.coldModifications = map[string]uint16{} - ts.warmModifications = map[string]uint16{} - return nil -} - -// SetReadScope sets the readscope of ts to [keys]. -func (ts *TState) SetScope(_ context.Context, keys set.Set[string], storage map[string][]byte) { - ts.scope = keys - ts.scopeStorage = storage - ts.creations = map[string]uint16{} - ts.coldModifications = map[string]uint16{} - ts.warmModifications = map[string]uint16{} -} - -// DisableCreation causes [Insert] to return an error if -// it would create a new key. This can be useful for constraining -// what a transaction can do during block execution (to allow for -// cheaper fees). -// -// Note, creation defaults to true. -func (ts *TState) DisableCreation() { - ts.canCreate = false -} - -// EnableCreation removes the forcer error case in [Insert] -// if a new key is created. -// -// Note, creation defaults to true. -func (ts *TState) EnableCreation() { - ts.canCreate = true -} - -// checkScope returns whether [k] is in ts.readScope. -func (ts *TState) checkScope(_ context.Context, k []byte) bool { - return ts.scope.Contains(string(k)) -} - -// Insert sets or updates ts.storage[key] to equal {value, false}. -// -// Any bytes passed into [Insert] will be consumed by [TState] and should -// not be modified/referenced after this call. -func (ts *TState) Insert(ctx context.Context, key []byte, value []byte) error { - if !ts.checkScope(ctx, key) { - return ErrKeyNotSpecified - } - if !keys.VerifyValue(key, value) { - return ErrInvalidKeyValue - } - k := string(key) - past, changed, exists := ts.getValue(ctx, k) - var err error - if exists { - // If a key is already in [coldModifications], we should still - // consider it a [coldModification] even if it is [changed]. - // This occurs when we modify a key for the second time in - // a single transaction. - // - // If a key is not in [coldModifications] and it is [changed], - // it was either created/modified in a different transaction - // in the block or created in this transaction. - if _, ok := ts.coldModifications[k]; ok || !changed { - err = updateChunks(ts.coldModifications, k, value) - } else { - err = updateChunks(ts.warmModifications, k, value) - } - } else { - if !ts.canCreate { - err = ErrCreationDisabled - } else { - err = updateChunks(ts.creations, k, value) - } - } - if err != nil { - return err - } - ts.ops = append(ts.ops, &op{ - k: k, - pastExists: exists, - pastV: past, - pastChanged: changed, - }) - ts.changedKeys[k] = maybe.Some(value) - return nil -} +func (ts *TState) PendingChanges() int { + ts.l.RLock() + defer ts.l.RUnlock() -// Remove deletes a key-value pair from ts.storage. -func (ts *TState) Remove(ctx context.Context, key []byte) error { - if !ts.checkScope(ctx, key) { - return ErrKeyNotSpecified - } - k := string(key) - past, changed, exists := ts.getValue(ctx, k) - if !exists { - // We do not update modificaations if the key does not exist. - return nil - } - // If a key is already in [coldModifications], we should still - // consider it a [coldModification] even if it is [changed]. - // This occurs when we modify a key for the second time in - // a single transaction. - // - // If a key is not in [coldModifications] and it is [changed], - // it was either created/modified in a different transaction - // in the block or created in this transaction. - var err error - if _, ok := ts.coldModifications[k]; ok || !changed { - err = updateChunks(ts.coldModifications, k, nil) - } else { - err = updateChunks(ts.warmModifications, k, nil) - } - if err != nil { - return err - } - ts.ops = append(ts.ops, &op{ - k: k, - pastExists: true, - pastV: past, - pastChanged: changed, - }) - ts.changedKeys[k] = maybe.Nothing[[]byte]() - return nil + return len(ts.changedKeys) } // OpIndex returns the number of operations done on ts. func (ts *TState) OpIndex() int { - return len(ts.ops) -} + ts.l.RLock() + defer ts.l.RUnlock() -func (ts *TState) PendingChanges() int { - return len(ts.changedKeys) + return ts.ops } -// Rollback restores the TState to before the ts.op[restorePoint] operation. -func (ts *TState) Rollback(_ context.Context, restorePoint int) { - for i := len(ts.ops) - 1; i >= restorePoint; i-- { - op := ts.ops[i] - // insert: Modified key for the first time - // - // remove: Removed key that was modified for first time in run - if !op.pastChanged { - delete(ts.changedKeys, op.k) - continue - } - // insert: Modified key for the nth time - // - // remove: Removed key that was previously modified in run - if !op.pastExists { - ts.changedKeys[op.k] = maybe.Nothing[[]byte]() - } else { - ts.changedKeys[op.k] = maybe.Some(op.pastV) - } - } - ts.ops = ts.ops[:restorePoint] -} - -// CreateView creates a slice of [database.BatchOp] of all +// ExportMerkleDBView creates a slice of [database.BatchOp] of all // changes in [TState] that can be used to commit to [merkledb]. -func (ts *TState) CreateView( +func (ts *TState) ExportMerkleDBView( ctx context.Context, - view state.View, t trace.Tracer, //nolint:interfacer + view state.View, ) (merkledb.TrieView, error) { + ts.l.RLock() + defer ts.l.RUnlock() + ctx, span := t.Start( - ctx, "TState.CreateView", + ctx, "TState.ExportMerkleDBView", oteltrace.WithAttributes( attribute.Int("items", len(ts.changedKeys)), ), @@ -304,27 +86,3 @@ func (ts *TState) CreateView( return view.NewView(ctx, merkledb.ViewChanges{MapOps: ts.changedKeys, ConsumeBytes: true}) } - -// updateChunks sets the number of chunks associated with a key that will -// be returned in [KeyOperations]. -func updateChunks(m map[string]uint16, key string, value []byte) error { - chunks, ok := keys.NumChunks(value) - if !ok { - return ErrInvalidKeyValue - } - previousChunks, ok := m[key] - if !ok || chunks > previousChunks { - m[key] = chunks - } - return nil -} - -// KeyOperations returns the number of operations performed since the scope -// was last set. -// -// If an operation is performed more than once during this time, the largest -// operation will be returned here (if 1 chunk then 2 chunks are written to a key, -// this function will return 2 chunks). -func (ts *TState) KeyOperations() (map[string]uint16, map[string]uint16, map[string]uint16) { - return ts.creations, ts.coldModifications, ts.warmModifications -} diff --git a/tstate/tstate_test.go b/tstate/tstate_test.go index d69917ee70..1a055344f2 100644 --- a/tstate/tstate_test.go +++ b/tstate/tstate_test.go @@ -55,164 +55,112 @@ func TestGetValue(t *testing.T) { require := require.New(t) ctx := context.TODO() ts := New(10) - // GetValue without Scope perm - _, err := ts.GetValue(ctx, TestKey) - require.ErrorIs(err, ErrKeyNotSpecified, "No error thrown.") + // SetScope - ts.SetScope(ctx, set.Of(string(TestKey)), map[string][]byte{string(TestKey): TestVal}) - val, err := ts.GetValue(ctx, TestKey) - require.NoError(err, "Error getting value.") - require.Equal(TestVal, val, "Value was not saved correctly.") + tsv := ts.NewView(set.Of(string(TestKey)), map[string][]byte{string(TestKey): TestVal}) + val, err := tsv.GetValue(ctx, TestKey) + require.NoError(err, "unable to get value") + require.Equal(TestVal, val, "value was not saved correctly") } func TestGetValueNoStorage(t *testing.T) { require := require.New(t) ctx := context.TODO() ts := New(10) + // SetScope but dont add to storage - ts.SetScope(ctx, set.Of(string(TestKey)), map[string][]byte{}) - _, err := ts.GetValue(ctx, TestKey) - require.ErrorIs(database.ErrNotFound, err, "No error thrown.") + tsv := ts.NewView(set.Of(string(TestKey)), map[string][]byte{}) + _, err := tsv.GetValue(ctx, TestKey) + require.ErrorIs(database.ErrNotFound, err, "data should not exist") } func TestInsertNew(t *testing.T) { require := require.New(t) ctx := context.TODO() ts := New(10) - // Insert before SetScope - err := ts.Insert(ctx, TestKey, TestVal) - require.ErrorIs(ErrKeyNotSpecified, err, "No error thrown.") + // SetScope - ts.SetScope(ctx, set.Of(string(TestKey)), map[string][]byte{}) + tsv := ts.NewView(set.Of(string(TestKey)), map[string][]byte{}) + // Insert key - err = ts.Insert(ctx, TestKey, TestVal) - require.NoError(err, "Error thrown.") - val, err := ts.GetValue(ctx, TestKey) - require.NoError(err, "Error thrown.") - require.Equal(1, ts.OpIndex(), "Insert operation was not added.") - require.Equal(TestVal, val, "Value was not set correctly.") + require.NoError(tsv.Insert(ctx, TestKey, TestVal)) + val, err := tsv.GetValue(ctx, TestKey) + require.NoError(err) + require.Equal(1, tsv.OpIndex(), "insert was not added as an operation") + require.Equal(TestVal, val, "value was not set correctly") + + // Check commit + tsv.Commit() + require.Equal(1, ts.OpIndex(), "insert was not added as an operation") } func TestInsertUpdate(t *testing.T) { require := require.New(t) ctx := context.TODO() ts := New(10) + // SetScope and add - ts.SetScope(ctx, set.Of(string(TestKey)), map[string][]byte{string(TestKey): TestVal}) - require.Equal(0, ts.OpIndex(), "SetStorage operation was not added.") + tsv := ts.NewView(set.Of(string(TestKey)), map[string][]byte{string(TestKey): TestVal}) + require.Equal(0, ts.OpIndex()) + // Insert key newVal := []byte("newVal") - err := ts.Insert(ctx, TestKey, newVal) - require.NoError(err, "Error thrown.") - val, err := ts.GetValue(ctx, TestKey) - require.NoError(err, "Error thrown.") - require.Equal(1, ts.OpIndex(), "Insert operation was not added.") - require.Equal(newVal, val, "Value was not set correctly.") - require.Equal(TestVal, ts.ops[0].pastV, "PastVal was not set correctly.") - require.False(ts.ops[0].pastChanged, "PastVal was not set correctly.") - require.True(ts.ops[0].pastExists, "PastVal was not set correctly.") -} - -func TestFetchAndSetScope(t *testing.T) { - require := require.New(t) - ts := New(10) - db := NewTestDB() - ctx := context.TODO() - keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")} - vals := [][]byte{[]byte("val1"), []byte("val2"), []byte("val3")} - keySet := set.NewSet[string](3) - for i, key := range keys { - err := db.Insert(ctx, key, vals[i]) - require.NoError(err, "Error during insert.") - keySet.Add(string(key)) - } - err := ts.FetchAndSetScope(ctx, keySet, db) - require.NoError(err, "Error thrown.") - require.Equal(0, ts.OpIndex(), "Opertions not updated correctly.") - require.Equal(keySet, ts.scope, "Scope not updated correctly.") - // Check values - for i, key := range keys { - val, err := ts.GetValue(ctx, key) - require.NoError(err, "Error getting value.") - require.Equal(vals[i], val, "Value not set correctly.") - } -} + require.NoError(tsv.Insert(ctx, TestKey, newVal)) + val, err := tsv.GetValue(ctx, TestKey) + require.NoError(err) + require.Equal(1, tsv.OpIndex(), "insert operation was not added") + require.Equal(newVal, val, "value was not set correctly") + require.Equal(TestVal, tsv.ops[0].pastV, "PastVal was not set correctly") + require.False(tsv.ops[0].pastChanged, "PastVal was not set correctly") + require.True(tsv.ops[0].pastExists, "PastVal was not set correctly") -func TestFetchAndSetScopeMissingKey(t *testing.T) { - require := require.New(t) - ts := New(10) - db := NewTestDB() - ctx := context.TODO() - keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")} - vals := [][]byte{[]byte("val1"), []byte("val2"), []byte("val3")} - keySet := set.NewSet[string](3) - // Keys[3] not in db - for i, key := range keys[:len(keys)-1] { - keySet.Add(string(key)) - err := db.Insert(ctx, key, vals[i]) - require.NoError(err, "Error during insert.") - } - keySet.Add("key3") - err := ts.FetchAndSetScope(ctx, keySet, db) - require.NoError(err, "Error thrown.") - require.Equal(0, ts.OpIndex(), "Opertions not updated correctly.") - require.Equal(keySet, ts.scope, "Scope not updated correctly.") - // Check values - for i, key := range keys[:len(keys)-1] { - val, err := ts.GetValue(ctx, key) - require.NoError(err, "Error getting value.") - require.Equal(vals[i], val, "Value not set correctly.") - } - _, err = ts.GetValue(ctx, keys[2]) - require.ErrorIs(err, database.ErrNotFound, "Didn't throw correct erro.") + // Check value after commit + tsv.Commit() + tsv = ts.NewView(set.Of(string(TestKey)), map[string][]byte{string(TestKey): TestVal}) + val, err = tsv.GetValue(ctx, TestKey) + require.NoError(err) + require.Equal(newVal, val, "value was not committed correctly") } func TestRemoveInsertRollback(t *testing.T) { require := require.New(t) ts := New(10) ctx := context.TODO() - ts.SetScope(ctx, set.Of(string(TestKey)), map[string][]byte{}) + // Insert - err := ts.Insert(ctx, TestKey, TestVal) - require.NoError(err, "Error from insert.") - v, err := ts.GetValue(ctx, TestKey) + tsv := ts.NewView(set.Of(string(TestKey)), map[string][]byte{}) + require.NoError(tsv.Insert(ctx, TestKey, TestVal)) + v, err := tsv.GetValue(ctx, TestKey) require.NoError(err) require.Equal(TestVal, v) - require.Equal(1, ts.OpIndex(), "Opertions not updated correctly.") + require.Equal(1, tsv.OpIndex(), "opertions not updated correctly") + // Remove - err = ts.Remove(ctx, TestKey) - require.NoError(err, "Error from remove.") - _, err = ts.GetValue(ctx, TestKey) - require.ErrorIs(err, database.ErrNotFound, "Key not deleted from storage.") - require.Equal(2, ts.OpIndex(), "Opertions not updated correctly.") + require.NoError(tsv.Remove(ctx, TestKey), "unable to remove TestKey") + _, err = tsv.GetValue(ctx, TestKey) + require.ErrorIs(err, database.ErrNotFound, "Key not deleted from storage") + require.Equal(2, tsv.OpIndex(), "Opertions not updated correctly") + // Insert - err = ts.Insert(ctx, TestKey, TestVal) - require.NoError(err, "Error from insert.") - v, err = ts.GetValue(ctx, TestKey) + require.NoError(tsv.Insert(ctx, TestKey, TestVal)) + v, err = tsv.GetValue(ctx, TestKey) require.NoError(err) require.Equal(TestVal, v) - require.Equal(3, ts.OpIndex(), "Opertions not updated correctly.") - require.Equal(1, ts.PendingChanges()) + require.Equal(3, tsv.OpIndex(), "Opertions not updated correctly") + require.Equal(1, tsv.PendingChanges()) + // Rollback - ts.Rollback(ctx, 2) - _, err = ts.GetValue(ctx, TestKey) - require.ErrorIs(err, database.ErrNotFound, "Key not deleted from storage.") + tsv.Rollback(ctx, 2) + _, err = tsv.GetValue(ctx, TestKey) + require.ErrorIs(err, database.ErrNotFound, "Key not deleted from storage") + // Rollback - ts.Rollback(ctx, 1) - v, err = ts.GetValue(ctx, TestKey) + tsv.Rollback(ctx, 1) + v, err = tsv.GetValue(ctx, TestKey) require.NoError(err) require.Equal(TestVal, v) } -func TestRemoveNotInScope(t *testing.T) { - require := require.New(t) - ts := New(10) - ctx := context.TODO() - // Remove - err := ts.Remove(ctx, TestKey) - require.ErrorIs(err, ErrKeyNotSpecified, "ErrKeyNotSpecified should be thrown.") -} - func TestRestoreInsert(t *testing.T) { require := require.New(t) ts := New(10) @@ -220,28 +168,29 @@ func TestRestoreInsert(t *testing.T) { keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")} keySet := set.Of("key1", "key2", "key3") vals := [][]byte{[]byte("val1"), []byte("val2"), []byte("val3")} - ts.SetScope(ctx, keySet, map[string][]byte{}) + tsv := ts.NewView(keySet, map[string][]byte{}) for i, key := range keys { - err := ts.Insert(ctx, key, vals[i]) - require.NoError(err, "Error inserting.") + require.NoError(tsv.Insert(ctx, key, vals[i])) } updatedVal := []byte("newVal") - err := ts.Insert(ctx, keys[0], updatedVal) - require.NoError(err, "Error inserting.") - require.Equal(len(keys)+1, ts.OpIndex(), "Operations not added properly.") - val, err := ts.GetValue(ctx, keys[0]) - require.NoError(err, "Error getting value.") - require.Equal(updatedVal, val, "Value not updated correctly.") + require.NoError(tsv.Insert(ctx, keys[0], updatedVal)) + require.Equal(len(keys)+1, tsv.OpIndex(), "operations not added properly") + val, err := tsv.GetValue(ctx, keys[0]) + require.NoError(err, "error getting value") + require.Equal(updatedVal, val, "value not updated correctly") + // Rollback inserting updatedVal and key[2] - ts.Rollback(ctx, 2) - require.Equal(2, ts.OpIndex(), "Operations not rolled back properly.") + tsv.Rollback(ctx, 2) + require.Equal(2, tsv.OpIndex(), "operations not rolled back properly") + // Keys[2] was removed - _, err = ts.GetValue(ctx, keys[2]) - require.ErrorIs(err, database.ErrNotFound, "TState read op not rolled back properly.") + _, err = tsv.GetValue(ctx, keys[2]) + require.ErrorIs(err, database.ErrNotFound, "TState read op not rolled back properly") + // Keys[0] was set to past value - val, err = ts.GetValue(ctx, keys[0]) - require.NoError(err, "Error getting value.") - require.Equal(vals[0], val, "Value not rolled back properly.") + val, err = tsv.GetValue(ctx, keys[0]) + require.NoError(err, "error getting value") + require.Equal(vals[0], val, "value not rolled back properly") } func TestRestoreDelete(t *testing.T) { @@ -251,34 +200,36 @@ func TestRestoreDelete(t *testing.T) { keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")} keySet := set.Of("key1", "key2", "key3") vals := [][]byte{[]byte("val1"), []byte("val2"), []byte("val3")} - ts.SetScope(ctx, keySet, map[string][]byte{ + tsv := ts.NewView(keySet, map[string][]byte{ string(keys[0]): vals[0], string(keys[1]): vals[1], string(keys[2]): vals[2], }) + // Check scope for i, key := range keys { - val, err := ts.GetValue(ctx, key) - require.NoError(err, "Error getting value.") - require.Equal(vals[i], val, "Value not set correctly.") + val, err := tsv.GetValue(ctx, key) + require.NoError(err, "error getting value") + require.Equal(vals[i], val, "value not set correctly") } + // Remove all for _, key := range keys { - err := ts.Remove(ctx, key) - require.NoError(err, "Error removing from ts.") - _, err = ts.GetValue(ctx, key) - require.ErrorIs(err, database.ErrNotFound, "Value not removed.") + require.NoError(tsv.Remove(ctx, key), "error removing from ts") + _, err := tsv.GetValue(ctx, key) + require.ErrorIs(err, database.ErrNotFound, "value not removed") } - require.Equal(len(keys), ts.OpIndex(), "Operations not added properly.") - require.Equal(3, ts.PendingChanges()) + require.Equal(len(keys), tsv.OpIndex(), "operations not added properly") + require.Equal(3, tsv.PendingChanges()) + // Roll back all removes - ts.Rollback(ctx, 0) - require.Equal(0, ts.OpIndex(), "Operations not rolled back properly.") + tsv.Rollback(ctx, 0) + require.Equal(0, ts.OpIndex(), "operations not rolled back properly") require.Equal(0, ts.PendingChanges()) for i, key := range keys { - val, err := ts.GetValue(ctx, key) - require.NoError(err, "Error getting value.") - require.Equal(vals[i], val, "Value not reset correctly.") + val, err := tsv.GetValue(ctx, key) + require.NoError(err, "error getting value") + require.Equal(vals[i], val, "value not reset correctly") } } @@ -303,42 +254,51 @@ func TestCreateView(t *testing.T) { keys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")} keySet := set.Of("key1", "key2", "key3") vals := [][]byte{[]byte("val1"), []byte("val2"), []byte("val3")} - ts.SetScope(ctx, keySet, map[string][]byte{}) + // Add + tsv := ts.NewView(keySet, map[string][]byte{}) for i, key := range keys { - err := ts.Insert(ctx, key, vals[i]) - require.NoError(err, "Error inserting value.") - val, err := ts.GetValue(ctx, key) - require.NoError(err, "Error getting value.") - require.Equal(vals[i], val, "Value not set correctly.") + require.NoError(tsv.Insert(ctx, key, vals[i]), "error inserting value") + val, err := tsv.GetValue(ctx, key) + require.NoError(err, "error getting value") + require.Equal(vals[i], val, "value not set correctly") } - view, err := ts.CreateView(ctx, db, tracer) - require.NoError(err, "Error writing changes.") + tsv.Commit() + + // Create merkle view + view, err := ts.ExportMerkleDBView(ctx, tracer, db) + require.NoError(err, "error writing changes") require.NoError(view.CommitToDB(ctx)) + // Check if db was updated correctly for i, key := range keys { val, _ := db.GetValue(ctx, key) - require.Equal(vals[i], val, "Value not updated in db.") + require.Equal(vals[i], val, "value not updated in db") } + // Remove ts = New(10) - ts.SetScope(ctx, keySet, map[string][]byte{ + tsv = ts.NewView(keySet, map[string][]byte{ string(keys[0]): vals[0], string(keys[1]): vals[1], string(keys[2]): vals[2], }) for _, key := range keys { - err := ts.Remove(ctx, key) - require.NoError(err, "Error removing from ts.") - _, err = ts.GetValue(ctx, key) - require.ErrorIs(err, database.ErrNotFound, "Key not removed.") + err := tsv.Remove(ctx, key) + require.NoError(err, "error removing from ts") + _, err = tsv.GetValue(ctx, key) + require.ErrorIs(err, database.ErrNotFound, "key not removed") } - view, err = ts.CreateView(ctx, db, tracer) - require.NoError(err, "Error writing changes.") + tsv.Commit() + + // Create merkle view + view, err = tsv.ts.ExportMerkleDBView(ctx, tracer, db) + require.NoError(err, "error writing changes") require.NoError(view.CommitToDB(ctx)) + // Check if db was updated correctly for _, key := range keys { _, err := db.GetValue(ctx, key) - require.ErrorIs(err, database.ErrNotFound, "Value not removed from db.") + require.ErrorIs(err, database.ErrNotFound, "value not removed from db") } } diff --git a/tstate/tstate_view.go b/tstate/tstate_view.go new file mode 100644 index 0000000000..662dd68748 --- /dev/null +++ b/tstate/tstate_view.go @@ -0,0 +1,265 @@ +// Copyright (C) 2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package tstate + +import ( + "context" + + "github.com/ava-labs/avalanchego/database" + "github.com/ava-labs/avalanchego/utils/maybe" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/hypersdk/keys" +) + +const defaultOps = 4 + +type TStateView struct { + ts *TState + pendingChangedKeys map[string]maybe.Maybe[[]byte] + + // Ops is a record of all operations performed on [TState]. Tracking + // operations allows for reverting state to a certain point-in-time. + ops []*op + + // We don't differentiate between read and write scope. + scope set.Set[string] // stores a list of managed keys in the TState struct + scopeStorage map[string][]byte + + // Store which keys are modified and how large their values were. Reset + // whenever setting scope. + canCreate bool + creations map[string]uint16 + coldModifications map[string]uint16 + warmModifications map[string]uint16 +} + +func (ts *TState) NewView(scope set.Set[string], storage map[string][]byte) *TStateView { + return &TStateView{ + ts: ts, + pendingChangedKeys: make(map[string]maybe.Maybe[[]byte], len(scope)), + ops: make([]*op, 0, defaultOps), + scope: scope, + scopeStorage: storage, + canCreate: true, // default to allowing creation + creations: make(map[string]uint16, len(scope)), + coldModifications: make(map[string]uint16, len(scope)), + warmModifications: make(map[string]uint16, len(scope)), + } +} + +// Rollback restores the TState to before the ts.op[restorePoint] operation. +func (ts *TStateView) Rollback(_ context.Context, restorePoint int) { + for i := len(ts.ops) - 1; i >= restorePoint; i-- { + op := ts.ops[i] + // insert: Modified key for the first time + // + // remove: Removed key that was modified for first time in run + if !op.pastChanged { + delete(ts.pendingChangedKeys, op.k) + continue + } + // insert: Modified key for the nth time + // + // remove: Removed key that was previously modified in run + if !op.pastExists { + ts.pendingChangedKeys[op.k] = maybe.Nothing[[]byte]() + } else { + ts.pendingChangedKeys[op.k] = maybe.Some(op.pastV) + } + } + ts.ops = ts.ops[:restorePoint] +} + +// OpIndex returns the number of operations done on ts. +func (ts *TStateView) OpIndex() int { + return len(ts.ops) +} + +// DisableCreation causes [Insert] to return an error if +// it would create a new key. This can be useful for constraining +// what a transaction can do during block execution (to allow for +// cheaper fees). +// +// Note, creation defaults to true. +func (ts *TStateView) DisableCreation() { + ts.canCreate = false +} + +// EnableCreation removes the forcer error case in [Insert] +// if a new key is created. +// +// Note, creation defaults to true. +func (ts *TStateView) EnableCreation() { + ts.canCreate = true +} + +// KeyOperations returns the number of operations performed since the scope +// was last set. +// +// If an operation is performed more than once during this time, the largest +// operation will be returned here (if 1 chunk then 2 chunks are written to a key, +// this function will return 2 chunks). +func (ts *TStateView) KeyOperations() (map[string]uint16, map[string]uint16, map[string]uint16) { + return ts.creations, ts.coldModifications, ts.warmModifications +} + +// checkScope returns whether [k] is in ts.readScope. +func (ts *TStateView) checkScope(_ context.Context, k []byte) bool { + return ts.scope.Contains(string(k)) +} + +// GetValue returns the value associated from tempStorage with the +// associated [key]. If [key] does not exist in readScope or if it is not found +// in storage an error is returned. +func (ts *TStateView) GetValue(ctx context.Context, key []byte) ([]byte, error) { + if !ts.checkScope(ctx, key) { + return nil, ErrKeyNotSpecified + } + k := string(key) + v, _, exists := ts.getValue(ctx, k) + if !exists { + return nil, database.ErrNotFound + } + return v, nil +} + +// Exists returns whether or not the associated [key] is present. +func (ts *TStateView) Exists(ctx context.Context, key []byte) (bool, bool, error) { + if !ts.checkScope(ctx, key) { + return false, false, ErrKeyNotSpecified + } + k := string(key) + _, changed, exists := ts.getValue(ctx, k) + return changed, exists, nil +} + +func (ts *TStateView) getValue(ctx context.Context, key string) ([]byte, bool, bool) { + if v, ok := ts.pendingChangedKeys[key]; ok { + if v.IsNothing() { + return nil, true, false + } + return v.Value(), true, true + } + if v, changed, exists := ts.ts.getChangedValue(ctx, key); changed { + return v, true, exists + } + if v, ok := ts.scopeStorage[key]; ok { + return v, false, true + } + return nil, false, false +} + +// Insert sets or updates ts.storage[key] to equal {value, false}. +// +// Any bytes passed into [Insert] will be consumed by [TState] and should +// not be modified/referenced after this call. +func (ts *TStateView) Insert(ctx context.Context, key []byte, value []byte) error { + if !ts.checkScope(ctx, key) { + return ErrKeyNotSpecified + } + if !keys.VerifyValue(key, value) { + return ErrInvalidKeyValue + } + k := string(key) + past, changed, exists := ts.getValue(ctx, k) + var err error + if exists { + // If a key is already in [coldModifications], we should still + // consider it a [coldModification] even if it is [changed]. + // This occurs when we modify a key for the second time in + // a single transaction. + // + // If a key is not in [coldModifications] and it is [changed], + // it was either created/modified in a different transaction + // in the block or created in this transaction. + if _, ok := ts.coldModifications[k]; ok || !changed { + err = updateChunks(ts.coldModifications, k, value) + } else { + err = updateChunks(ts.warmModifications, k, value) + } + } else { + if !ts.canCreate { + err = ErrCreationDisabled + } else { + err = updateChunks(ts.creations, k, value) + } + } + if err != nil { + return err + } + ts.ops = append(ts.ops, &op{ + k: k, + pastExists: exists, + pastV: past, + pastChanged: changed, + }) + ts.pendingChangedKeys[k] = maybe.Some(value) + return nil +} + +// Remove deletes a key-value pair from ts.storage. +func (ts *TStateView) Remove(ctx context.Context, key []byte) error { + if !ts.checkScope(ctx, key) { + return ErrKeyNotSpecified + } + k := string(key) + past, changed, exists := ts.getValue(ctx, k) + if !exists { + // We do not update modificaations if the key does not exist. + return nil + } + // If a key is already in [coldModifications], we should still + // consider it a [coldModification] even if it is [changed]. + // This occurs when we modify a key for the second time in + // a single transaction. + // + // If a key is not in [coldModifications] and it is [changed], + // it was either created/modified in a different transaction + // in the block or created in this transaction. + var err error + if _, ok := ts.coldModifications[k]; ok || !changed { + err = updateChunks(ts.coldModifications, k, nil) + } else { + err = updateChunks(ts.warmModifications, k, nil) + } + if err != nil { + return err + } + ts.ops = append(ts.ops, &op{ + k: k, + pastExists: true, + pastV: past, + pastChanged: changed, + }) + ts.pendingChangedKeys[k] = maybe.Nothing[[]byte]() + return nil +} + +func (ts *TStateView) PendingChanges() int { + return len(ts.pendingChangedKeys) +} + +func (ts *TStateView) Commit() { + ts.ts.l.Lock() + defer ts.ts.l.Unlock() + + for k, v := range ts.pendingChangedKeys { + ts.ts.changedKeys[k] = v + } + ts.ts.ops += len(ts.ops) +} + +// updateChunks sets the number of chunks associated with a key that will +// be returned in [KeyOperations]. +func updateChunks(m map[string]uint16, key string, value []byte) error { + chunks, ok := keys.NumChunks(value) + if !ok { + return ErrInvalidKeyValue + } + previousChunks, ok := m[key] + if !ok || chunks > previousChunks { + m[key] = chunks + } + return nil +} diff --git a/vm/dependencies.go b/vm/dependencies.go index f5151ace84..886d887e55 100644 --- a/vm/dependencies.go +++ b/vm/dependencies.go @@ -26,15 +26,10 @@ type Handlers map[string]*common.HTTPHandler type Config interface { GetTraceConfig() *trace.Config - // Parallelism is split between signature verification - // and root generation (50/50), where any odd cores - // are added to signature verification. - // - // These operations are typically both done at the same time - // and will cause CPU thrashing if both given full access to all - // cores. - GetParallelism() int GetMempoolSize() int + GetSignatureVerificationCores() int + GetRootGenerationCores() int + GetTransactionExecutionCores() int GetMempoolPayerSize() int GetMempoolExemptPayers() [][]byte GetVerifySignatures() bool diff --git a/vm/metrics.go b/vm/metrics.go index fc1b09060a..cd6177ee79 100644 --- a/vm/metrics.go +++ b/vm/metrics.go @@ -6,38 +6,59 @@ package vm import ( "github.com/ava-labs/avalanchego/utils/metric" "github.com/ava-labs/avalanchego/utils/wrappers" + "github.com/ava-labs/hypersdk/executor" "github.com/prometheus/client_golang/prometheus" ) +type executorMetrics struct { + blocked prometheus.Counter + executable prometheus.Counter +} + +func (em *executorMetrics) RecordBlocked() { + em.blocked.Inc() +} + +func (em *executorMetrics) RecordExecutable() { + em.executable.Inc() +} + type Metrics struct { - txsSubmitted prometheus.Counter // includes gossip - txsReceived prometheus.Counter - seenTxsReceived prometheus.Counter - txsGossiped prometheus.Counter - txsVerified prometheus.Counter - txsAccepted prometheus.Counter - stateChanges prometheus.Counter - stateOperations prometheus.Counter - buildCapped prometheus.Counter - emptyBlockBuilt prometheus.Counter - clearedMempool prometheus.Counter - deletedBlocks prometheus.Counter - blocksFromDisk prometheus.Counter - blocksHeightsFromDisk prometheus.Counter - mempoolSize prometheus.Gauge - bandwidthPrice prometheus.Gauge - computePrice prometheus.Gauge - storageReadPrice prometheus.Gauge - storageCreatePrice prometheus.Gauge - storageModifyPrice prometheus.Gauge - rootCalculated metric.Averager - waitRoot metric.Averager - waitSignatures metric.Averager - blockBuild metric.Averager - blockParse metric.Averager - blockVerify metric.Averager - blockAccept metric.Averager - blockProcess metric.Averager + txsSubmitted prometheus.Counter // includes gossip + txsReceived prometheus.Counter + seenTxsReceived prometheus.Counter + txsGossiped prometheus.Counter + txsVerified prometheus.Counter + txsAccepted prometheus.Counter + stateChanges prometheus.Counter + stateOperations prometheus.Counter + buildCapped prometheus.Counter + emptyBlockBuilt prometheus.Counter + clearedMempool prometheus.Counter + deletedBlocks prometheus.Counter + blocksFromDisk prometheus.Counter + blocksHeightsFromDisk prometheus.Counter + executorBuildBlocked prometheus.Counter + executorBuildExecutable prometheus.Counter + executorVerifyBlocked prometheus.Counter + executorVerifyExecutable prometheus.Counter + mempoolSize prometheus.Gauge + bandwidthPrice prometheus.Gauge + computePrice prometheus.Gauge + storageReadPrice prometheus.Gauge + storageCreatePrice prometheus.Gauge + storageModifyPrice prometheus.Gauge + rootCalculated metric.Averager + waitRoot metric.Averager + waitSignatures metric.Averager + blockBuild metric.Averager + blockParse metric.Averager + blockVerify metric.Averager + blockAccept metric.Averager + blockProcess metric.Averager + + executorBuildRecorder executor.Metrics + executorVerifyRecorder executor.Metrics } func newMetrics() (*prometheus.Registry, *Metrics, error) { @@ -187,6 +208,26 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) { Name: "block_heights_from_disk", Help: "number of block heights attempted to load from disk", }), + executorBuildBlocked: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chain", + Name: "executor_build_blocked", + Help: "executor tasks blocked during build", + }), + executorBuildExecutable: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chain", + Name: "executor_build_executable", + Help: "executor tasks executable during build", + }), + executorVerifyBlocked: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chain", + Name: "executor_verify_blocked", + Help: "executor tasks blocked during verify", + }), + executorVerifyExecutable: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "chain", + Name: "executor_verify_executable", + Help: "executor tasks executable during verify", + }), mempoolSize: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "chain", Name: "mempool_size", @@ -226,6 +267,9 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) { blockAccept: blockAccept, blockProcess: blockProcess, } + m.executorBuildRecorder = &executorMetrics{blocked: m.executorBuildBlocked, executable: m.executorBuildExecutable} + m.executorVerifyRecorder = &executorMetrics{blocked: m.executorVerifyBlocked, executable: m.executorVerifyExecutable} + errs := wrappers.Errs{} errs.Add( r.Register(m.txsSubmitted), @@ -243,6 +287,10 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) { r.Register(m.deletedBlocks), r.Register(m.blocksFromDisk), r.Register(m.blocksHeightsFromDisk), + r.Register(m.executorBuildBlocked), + r.Register(m.executorBuildExecutable), + r.Register(m.executorVerifyBlocked), + r.Register(m.executorVerifyExecutable), r.Register(m.bandwidthPrice), r.Register(m.computePrice), r.Register(m.storageReadPrice), diff --git a/vm/resolutions.go b/vm/resolutions.go index bc3bc19af0..e5d987876f 100644 --- a/vm/resolutions.go +++ b/vm/resolutions.go @@ -20,6 +20,7 @@ import ( "github.com/ava-labs/hypersdk/builder" "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/executor" "github.com/ava-labs/hypersdk/gossiper" "github.com/ava-labs/hypersdk/workers" ) @@ -479,3 +480,15 @@ func (vm *VM) UnitPrices(context.Context) (chain.Dimensions, error) { } return chain.NewFeeManager(v).UnitPrices(), nil } + +func (vm *VM) GetTransactionExecutionCores() int { + return vm.config.GetTransactionExecutionCores() +} + +func (vm *VM) GetExecutorBuildRecorder() executor.Metrics { + return vm.metrics.executorBuildRecorder +} + +func (vm *VM) GetExecutorVerifyRecorder() executor.Metrics { + return vm.metrics.executorVerifyRecorder +} diff --git a/vm/vm.go b/vm/vm.go index dc252d894f..f761a483c2 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -24,7 +24,6 @@ import ( "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils" "github.com/ava-labs/avalanchego/utils/crypto/bls" - "github.com/ava-labs/avalanchego/utils/math" "github.com/ava-labs/avalanchego/utils/profiler" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -203,14 +202,12 @@ func (vm *VM) Initialize( } // Instantiate DBs - parallelism := vm.config.GetParallelism() - rootGenParallelism := math.Max(parallelism/2, 1) merkleRegistry := prometheus.NewRegistry() vm.stateDB, err = merkledb.New(ctx, vm.rawStateDB, merkledb.Config{ BranchFactor: vm.genesis.GetStateBranchFactor(), // RootGenConcurrency limits the number of goroutines // that will be used across all concurrent root generations. - RootGenConcurrency: uint(rootGenParallelism), + RootGenConcurrency: uint(vm.config.GetRootGenerationCores()), EvictionBatchSize: uint(vm.config.GetStateEvictionBatchSize()), HistoryLength: uint(vm.config.GetStateHistoryLength()), IntermediateNodeCacheSize: uint(vm.config.GetIntermediateNodeCacheSize()), @@ -229,8 +226,7 @@ func (vm *VM) Initialize( // // If [parallelism] is odd, we assign the extra // core to signature verification. - sigParallelism := math.Max(parallelism-rootGenParallelism, 1) - vm.sigWorkers = workers.NewParallel(sigParallelism, 100) // TODO: make job backlog a const + vm.sigWorkers = workers.NewParallel(vm.config.GetSignatureVerificationCores(), 100) // TODO: make job backlog a const // Init channels before initializing other structs vm.toEngine = toEngine