diff --git a/vms/platformvm/block/builder/builder.go b/vms/platformvm/block/builder/builder.go index 79c05992bf53..dd46253aeb4e 100644 --- a/vms/platformvm/block/builder/builder.go +++ b/vms/platformvm/block/builder/builder.go @@ -7,13 +7,13 @@ import ( "context" "errors" "fmt" + "sync" "time" "go.uber.org/zap" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/timer/mockable" "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/avalanchego/vms/platformvm/block" @@ -33,24 +33,31 @@ const targetBlockSize = 128 * units.KiB var ( _ Builder = (*builder)(nil) - ErrEndOfTime = errors.New("program time is suspiciously far in the future") - ErrNoPendingBlocks = errors.New("no pending blocks") + ErrEndOfTime = errors.New("program time is suspiciously far in the future") + ErrNoPendingBlocks = errors.New("no pending blocks") + errMissingPreferredState = errors.New("missing preferred block state") + errCalculatingNextStakerTime = errors.New("failed calculating next staker time") ) type Builder interface { mempool.Mempool - // ResetBlockTimer schedules a timer to notify the consensus engine once - // there is a block ready to be built. If a block is ready to be built when - // this function is called, the engine will be notified directly. + // StartBlockTimer starts to issue block creation requests to advance the + // chain timestamp. + StartBlockTimer() + + // ResetBlockTimer forces the block timer to recalculate when it should + // advance the chain timestamp. ResetBlockTimer() - // BuildBlock is called on timer clock to attempt to create - // next block - BuildBlock(context.Context) (snowman.Block, error) + // ShutdownBlockTimer stops block creation requests to advance the chain + // timestamp. + // + // Invariant: Assumes the context lock is held when calling. + ShutdownBlockTimer() - // Shutdown cleanly shuts Builder down - Shutdown() + // BuildBlock can be called to attempt to create a new block + BuildBlock(context.Context) (snowman.Block, error) } // builder implements a simple builder to convert txs into valid blocks @@ -61,10 +68,11 @@ type builder struct { txExecutorBackend *txexecutor.Backend blkManager blockexecutor.Manager - // This timer goes off when it is time for the next validator to add/leave - // the validator set. When it goes off ResetTimer() is called, potentially - // triggering creation of a new block. - timer *timer.Timer + // resetTimer is used to signal that the block builder timer should update + // when it will trigger building of a block. + resetTimer chan struct{} + closed chan struct{} + closeOnce sync.Once } func New( @@ -73,17 +81,111 @@ func New( txExecutorBackend *txexecutor.Backend, blkManager blockexecutor.Manager, ) Builder { - builder := &builder{ + return &builder{ Mempool: mempool, txBuilder: txBuilder, txExecutorBackend: txExecutorBackend, blkManager: blkManager, + resetTimer: make(chan struct{}, 1), + closed: make(chan struct{}), + } +} + +func (b *builder) StartBlockTimer() { + go func() { + timer := time.NewTimer(0) + defer timer.Stop() + + for { + // Invariant: The [timer] is not stopped. + select { + case <-timer.C: + case <-b.resetTimer: + if !timer.Stop() { + <-timer.C + } + case <-b.closed: + return + } + + // Note: Because the context lock is not held here, it is possible + // that [ShutdownBlockTimer] is called concurrently with this + // execution. + for { + duration, err := b.durationToSleep() + if err != nil { + b.txExecutorBackend.Ctx.Log.Error("block builder encountered a fatal error", + zap.Error(err), + ) + return + } + + if duration > 0 { + timer.Reset(duration) + break + } + + // Block needs to be issued to advance time. + b.Mempool.RequestBuildBlock(true /*=emptyBlockPermitted*/) + + // Invariant: ResetBlockTimer is guaranteed to be called after + // [durationToSleep] returns a value <= 0. This is because we + // are guaranteed to attempt to build block. After building a + // valid block, the chain will have its preference updated which + // may change the duration to sleep and trigger a timer reset. + select { + case <-b.resetTimer: + case <-b.closed: + return + } + } + } + }() +} + +func (b *builder) durationToSleep() (time.Duration, error) { + // Grabbing the lock here enforces that this function is not called mid-way + // through modifying of the state. + b.txExecutorBackend.Ctx.Lock.Lock() + defer b.txExecutorBackend.Ctx.Lock.Unlock() + + // If [ShutdownBlockTimer] was called, we want to exit the block timer + // goroutine. We check this with the context lock held because + // [ShutdownBlockTimer] is expected to only be called with the context lock + // held. + select { + case <-b.closed: + return 0, nil + default: + } + + preferredID := b.blkManager.Preferred() + preferredState, ok := b.blkManager.GetState(preferredID) + if !ok { + return 0, fmt.Errorf("%w: %s", errMissingPreferredState, preferredID) + } + + nextStakerChangeTime, err := txexecutor.GetNextStakerChangeTime(preferredState) + if err != nil { + return 0, fmt.Errorf("%w of %s: %w", errCalculatingNextStakerTime, preferredID, err) } - builder.timer = timer.NewTimer(builder.setNextBuildBlockTime) + now := b.txExecutorBackend.Clk.Time() + return nextStakerChangeTime.Sub(now), nil +} + +func (b *builder) ResetBlockTimer() { + // Ensure that the timer will be reset at least once. + select { + case b.resetTimer <- struct{}{}: + default: + } +} - go txExecutorBackend.Ctx.Log.RecoverAndPanic(builder.timer.Dispatch) - return builder +func (b *builder) ShutdownBlockTimer() { + b.closeOnce.Do(func() { + close(b.closed) + }) } // BuildBlock builds a block to be added to consensus. @@ -93,27 +195,18 @@ func (b *builder) BuildBlock(context.Context) (snowman.Block, error) { b.Mempool.DisableAdding() defer func() { b.Mempool.EnableAdding() + // If we need to advance the chain's timestamp in a standard block, but + // we build an invalid block, then we need to re-trigger block building. + // + // TODO: Remove once we are guaranteed to build a valid block. b.ResetBlockTimer() + // If there are still transactions in the mempool, then we need to + // re-trigger block building. + b.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/) }() - ctx := b.txExecutorBackend.Ctx - ctx.Log.Debug("starting to attempt to build a block") - - statelessBlk, err := b.buildBlock() - if err != nil { - return nil, err - } - - // Remove selected txs from mempool now that we are returning the block to - // the consensus engine. - txs := statelessBlk.Txs() - b.Mempool.Remove(txs) - return b.blkManager.NewBlock(statelessBlk), nil -} + b.txExecutorBackend.Ctx.Log.Debug("starting to attempt to build a block") -// Returns the block we want to build and issue. -// Only modifies state to remove expired proposal txs. -func (b *builder) buildBlock() (block.Block, error) { // Get the block to build on top of and retrieve the new block's context. preferredID := b.blkManager.Preferred() preferred, err := b.blkManager.GetBlock(preferredID) @@ -131,7 +224,7 @@ func (b *builder) buildBlock() (block.Block, error) { return nil, fmt.Errorf("could not calculate next staker change time: %w", err) } - return buildBlock( + statelessBlk, err := buildBlock( b, preferredID, nextHeight, @@ -139,75 +232,15 @@ func (b *builder) buildBlock() (block.Block, error) { timeWasCapped, preferredState, ) -} - -func (b *builder) Shutdown() { - // There is a potential deadlock if the timer is about to execute a timeout. - // So, the lock must be released before stopping the timer. - ctx := b.txExecutorBackend.Ctx - ctx.Lock.Unlock() - b.timer.Stop() - ctx.Lock.Lock() -} - -func (b *builder) ResetBlockTimer() { - // Next time the context lock is released, we can attempt to reset the block - // timer. - b.timer.SetTimeoutIn(0) -} - -func (b *builder) setNextBuildBlockTime() { - ctx := b.txExecutorBackend.Ctx - - // Grabbing the lock here enforces that this function is not called mid-way - // through modifying of the state. - ctx.Lock.Lock() - defer ctx.Lock.Unlock() - - if !b.txExecutorBackend.Bootstrapped.Get() { - ctx.Log.Verbo("skipping block timer reset", - zap.String("reason", "not bootstrapped"), - ) - return - } - - if _, err := b.buildBlock(); err == nil { - // We can build a block now - b.Mempool.RequestBuildBlock(true /*=emptyBlockPermitted*/) - return - } - - // Wake up when it's time to add/remove the next validator/delegator - preferredID := b.blkManager.Preferred() - preferredState, ok := b.blkManager.GetState(preferredID) - if !ok { - // The preferred block should always be a decision block - ctx.Log.Error("couldn't get preferred block state", - zap.Stringer("preferredID", preferredID), - zap.Stringer("lastAcceptedID", b.blkManager.LastAccepted()), - ) - return - } - - nextStakerChangeTime, err := txexecutor.GetNextStakerChangeTime(preferredState) if err != nil { - ctx.Log.Error("couldn't get next staker change time", - zap.Stringer("preferredID", preferredID), - zap.Stringer("lastAcceptedID", b.blkManager.LastAccepted()), - zap.Error(err), - ) - return + return nil, err } - now := b.txExecutorBackend.Clk.Time() - waitTime := nextStakerChangeTime.Sub(now) - ctx.Log.Debug("setting next scheduled event", - zap.Time("nextEventTime", nextStakerChangeTime), - zap.Duration("timeUntil", waitTime), - ) - - // Wake up when it's time to add/remove the next validator - b.timer.SetTimeoutIn(waitTime) + // Remove selected txs from mempool now that we are returning the block to + // the consensus engine. + txs := statelessBlk.Txs() + b.Mempool.Remove(txs) + return b.blkManager.NewBlock(statelessBlk), nil } // [timestamp] is min(max(now, parent timestamp), next staker change time) diff --git a/vms/platformvm/block/builder/builder_test.go b/vms/platformvm/block/builder/builder_test.go index 434d5b7b2552..3a030cdedcce 100644 --- a/vms/platformvm/block/builder/builder_test.go +++ b/vms/platformvm/block/builder/builder_test.go @@ -38,6 +38,7 @@ func TestBlockBuilderAddLocalTx(t *testing.T) { env.ctx.Lock.Lock() defer func() { require.NoError(shutdownEnvironment(env)) + env.ctx.Lock.Unlock() }() // Create a valid transaction @@ -78,6 +79,7 @@ func TestPreviouslyDroppedTxsCanBeReAddedToMempool(t *testing.T) { env.ctx.Lock.Lock() defer func() { require.NoError(shutdownEnvironment(env)) + env.ctx.Lock.Unlock() }() // Create a valid transaction @@ -130,6 +132,7 @@ func TestNoErrorOnUnexpectedSetPreferenceDuringBootstrapping(t *testing.T) { env.isBootstrapped.Set(false) defer func() { require.NoError(shutdownEnvironment(env)) + env.ctx.Lock.Unlock() }() require.True(env.blkManager.SetPreference(ids.GenerateTestID())) // should not panic @@ -322,6 +325,7 @@ func TestBuildBlock(t *testing.T) { env.ctx.Lock.Lock() defer func() { require.NoError(t, shutdownEnvironment(env)) + env.ctx.Lock.Unlock() }() var ( diff --git a/vms/platformvm/block/builder/helpers_test.go b/vms/platformvm/block/builder/helpers_test.go index 36a6822ce3e2..529c06a52d1b 100644 --- a/vms/platformvm/block/builder/helpers_test.go +++ b/vms/platformvm/block/builder/helpers_test.go @@ -194,6 +194,7 @@ func newEnvironment(t *testing.T) *environment { &res.backend, res.blkManager, ) + res.Builder.StartBlockTimer() res.blkManager.SetPreference(genesisID) addSubnet(t, res) @@ -419,7 +420,7 @@ func buildGenesisTest(t *testing.T, ctx *snow.Context) []byte { } func shutdownEnvironment(env *environment) error { - env.Builder.Shutdown() + env.Builder.ShutdownBlockTimer() if env.isBootstrapped.Get() { validatorIDs := env.config.Validators.GetValidatorIDs(constants.PrimaryNetworkID) diff --git a/vms/platformvm/block/builder/standard_block_test.go b/vms/platformvm/block/builder/standard_block_test.go index 827d7357728b..74177e1f88b0 100644 --- a/vms/platformvm/block/builder/standard_block_test.go +++ b/vms/platformvm/block/builder/standard_block_test.go @@ -26,6 +26,7 @@ func TestAtomicTxImports(t *testing.T) { env.ctx.Lock.Lock() defer func() { require.NoError(shutdownEnvironment(env)) + env.ctx.Lock.Unlock() }() utxoID := avax.UTXOID{ diff --git a/vms/platformvm/block/executor/rejector.go b/vms/platformvm/block/executor/rejector.go index cfc64b050be4..6b2565288fca 100644 --- a/vms/platformvm/block/executor/rejector.go +++ b/vms/platformvm/block/executor/rejector.go @@ -82,7 +82,7 @@ func (r *rejector) rejectBlock(b block.Block, blockType string) error { } } - r.Mempool.RequestBuildBlock(false) + r.Mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/) return nil } diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index 5f4945093d60..4600cce7f581 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -181,7 +181,7 @@ func (n *network) issueTx(tx *txs.Tx) error { return err } - n.mempool.RequestBuildBlock(false) + n.mempool.RequestBuildBlock(false /*=emptyBlockPermitted*/) return nil } diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index d9898b873137..4c1e10c645d6 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -330,7 +330,7 @@ func (vm *VM) onNormalOperationsStarted() error { } // Start the block builder - vm.Builder.ResetBlockTimer() + vm.Builder.StartBlockTimer() return nil } @@ -351,7 +351,7 @@ func (vm *VM) Shutdown(context.Context) error { return nil } - vm.Builder.Shutdown() + vm.Builder.ShutdownBlockTimer() if vm.bootstrapped.Get() { primaryVdrIDs := vm.Validators.GetValidatorIDs(constants.PrimaryNetworkID)