Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
[build-295] Improve Builder submission rate (#70)
Browse files Browse the repository at this point in the history
- Prior to change, builder submissions happened roughly between ~t-11 to ~t+3 where t is the time of slot s
- After change, builder submissions happen roughly between ~t-4 to ~t+3, which is desired

* Add bug fix for rate limit

* Expose environment variables that adjust builder rate limit and burst limit, update submit logic such that submissions happen near end of slot

* Update submit loop function

* Update conditional in resubmit loop

* Update rate limit variable name

* Create constant for default burst on builder rate limit

* Use CLI flags instead of bespoke environment variables for builder rate limit settings

* Fix typo

* Update logs for more data analysis

* Expose builder block resubmit interval as CLI flag and environment variable

* Update variable name for default builder block resubmit interval

* Fix wait time when timestamp is not passed in to address failures in unit tests

* Update README

* Update comments

* Fix when error log occurs

* Update log

* Update check

* Update go mod
  • Loading branch information
Wazzymandias authored May 26, 2023
1 parent f8c6b44 commit 66893df
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 60 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ $ geth --help
--builder.bellatrix_fork_version value (default: "0x02000000")
Bellatrix fork version. [$BUILDER_BELLATRIX_FORK_VERSION]
--builder.block_resubmit_interval value (default: "500ms")
Determines the interval at which builder will resubmit block submissions
[$FLASHBOTS_BUILDER_RATE_LIMIT_RESUBMIT_INTERVAL]
--builder.cancellations (default: false)
Enable cancellations for the builder
--builder.dry-run (default: false)
Builder only validates blocks without submission to the relay
Expand All @@ -67,6 +74,16 @@ $ geth --help
--builder.no_bundle_fetcher (default: false)
Disable the bundle fetcher
--builder.rate_limit_duration value (default: "500ms")
Determines rate limit of events processed by builder; a duration string is a
possibly signed sequence of decimal numbers, each with optional fraction and a
unit suffix, such as "300ms", "-1.5h" or "2h45m"
[$FLASHBOTS_BUILDER_RATE_LIMIT_DURATION]
--builder.rate_limit_max_burst value (default: 10)
Determines the maximum number of burst events the builder can accommodate at any
given point in time. [$FLASHBOTS_BUILDER_RATE_LIMIT_MAX_BURST]
--builder.relay_secret_key value (default: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11")
Builder local relay API key used for signing headers [$BUILDER_RELAY_SECRET_KEY]
Expand Down Expand Up @@ -115,6 +132,13 @@ $ geth --help
Environment variables:
```
BUILDER_TX_SIGNING_KEY - private key of the builder used to sign payment transaction, must be the same as the coinbase address
FLASHBOTS_BUILDER_RATE_LIMIT_DURATION - determines rate limit of events processed by builder; a duration string is a
possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m".
FLASHBOTS_BUILDER_RATE_LIMIT_MAX_BURST - determines the maximum number of events the builder can accommodate at any point in time
FLASHBOTS_BUILDER_RATE_LIMIT_RESUBMIT_INTERVAL - determines the interval at which builder will resubmit block submissions
```

## Metrics
Expand Down
96 changes: 70 additions & 26 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package builder
import (
"context"
"errors"
"fmt"
"math/big"
_ "os"
"sync"
Expand All @@ -26,6 +27,14 @@ import (
"golang.org/x/time/rate"
)

const (
RateLimitIntervalDefault = 500 * time.Millisecond
RateLimitBurstDefault = 10
BlockResubmitIntervalDefault = 500 * time.Millisecond

SubmissionDelaySecondsDefault = 4 * time.Second
)

type PubkeyHex string

type ValidatorData struct {
Expand Down Expand Up @@ -60,6 +69,7 @@ type Builder struct {
builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
builderSigningDomain boostTypes.Domain
builderResubmitInterval time.Duration

limiter *rate.Limiter

Expand All @@ -73,22 +83,33 @@ type Builder struct {

// BuilderArgs is a struct that contains all the arguments needed to create a new Builder
type BuilderArgs struct {
sk *bls.SecretKey
ds flashbotsextra.IDatabaseService
relay IRelay
builderSigningDomain boostTypes.Domain
eth IEthereumService
dryRun bool
ignoreLatePayloadAttributes bool
validator *blockvalidation.BlockValidationAPI
beaconClient IBeaconClient
sk *bls.SecretKey
ds flashbotsextra.IDatabaseService
relay IRelay
builderSigningDomain boostTypes.Domain
builderBlockResubmitInterval time.Duration
eth IEthereumService
dryRun bool
ignoreLatePayloadAttributes bool
validator *blockvalidation.BlockValidationAPI
beaconClient IBeaconClient

limiter *rate.Limiter
}

func NewBuilder(args BuilderArgs) *Builder {
pkBytes := bls.PublicKeyFromSecretKey(args.sk).Compress()
pk := boostTypes.PublicKey{}
pk.FromSlice(pkBytes)

if args.limiter == nil {
args.limiter = rate.NewLimiter(rate.Every(RateLimitIntervalDefault), RateLimitBurstDefault)
}

if args.builderBlockResubmitInterval == 0 {
args.builderBlockResubmitInterval = BlockResubmitIntervalDefault
}

slotCtx, slotCtxCancel := context.WithCancel(context.Background())
return &Builder{
ds: args.ds,
Expand All @@ -101,8 +122,9 @@ func NewBuilder(args BuilderArgs) *Builder {
builderSecretKey: args.sk,
builderPublicKey: pk,
builderSigningDomain: args.builderSigningDomain,
builderResubmitInterval: args.builderBlockResubmitInterval,

limiter: rate.NewLimiter(rate.Every(time.Millisecond), 510),
limiter: args.limiter,
slotCtx: slotCtx,
slotCtxCancel: slotCtxCancel,

Expand All @@ -129,11 +151,25 @@ func (b *Builder) Start() error {
} else if payloadAttributes.Slot == currentSlot {
// Subsequent sse events should only be canonical!
if !b.ignoreLatePayloadAttributes {
b.OnPayloadAttribute(&payloadAttributes)
err := b.OnPayloadAttribute(&payloadAttributes)
if err != nil {
log.Error("error with builder processing on payload attribute",
"latestSlot", currentSlot,
"processedSlot", payloadAttributes.Slot,
"headHash", payloadAttributes.HeadHash.String(),
"error", err)
}
}
} else if payloadAttributes.Slot > currentSlot {
currentSlot = payloadAttributes.Slot
b.OnPayloadAttribute(&payloadAttributes)
err := b.OnPayloadAttribute(&payloadAttributes)
if err != nil {
log.Error("error with builder processing on payload attribute",
"latestSlot", currentSlot,
"processedSlot", payloadAttributes.Slot,
"headHash", payloadAttributes.HeadHash.String(),
"error", err)
}
}
}
}
Expand All @@ -158,7 +194,8 @@ func (b *Builder) onSealedBlock(block *types.Block, blockValue *big.Int, ordersC
}
}

log.Info("submitted block", "slot", attrs.Slot, "value", blockValue.String(), "parent", block.ParentHash, "hash", block.Hash(), "#commitedBundles", len(commitedBundles))
log.Info("submitted block", "slot", attrs.Slot, "value", blockValue.String(), "parent", block.ParentHash,
"hash", block.Hash(), "#commitedBundles", len(commitedBundles))

return nil
}
Expand Down Expand Up @@ -290,17 +327,15 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro

vd, err := b.relay.GetValidatorForSlot(attrs.Slot)
if err != nil {
log.Info("could not get validator while submitting block", "err", err, "slot", attrs.Slot)
return err
return fmt.Errorf("could not get validator while submitting block for slot %d - %w", attrs.Slot, err)
}

attrs.SuggestedFeeRecipient = [20]byte(vd.FeeRecipient)
attrs.GasLimit = vd.GasLimit

proposerPubkey, err := boostTypes.HexToPubkey(string(vd.Pubkey))
if err != nil {
log.Error("could not parse pubkey", "err", err, "pubkey", vd.Pubkey)
return err
return fmt.Errorf("could not parse pubkey (%s) - %w", vd.Pubkey, err)
}

if !b.eth.Synced() {
Expand All @@ -309,8 +344,7 @@ func (b *Builder) OnPayloadAttribute(attrs *types.BuilderPayloadAttributes) erro

parentBlock := b.eth.GetBlockByHash(attrs.HeadHash)
if parentBlock == nil {
log.Warn("Block hash not found in blocktree", "head block hash", attrs.HeadHash)
return errors.New("parent block not found in blocktree")
return fmt.Errorf("parent block hash not found in block tree given head block hash %s", attrs.HeadHash)
}

b.slotMu.Lock()
Expand Down Expand Up @@ -362,7 +396,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
queueBestEntry blockQueueEntry
)

log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash)
log.Debug("runBuildingJob", "slot", attrs.Slot, "parent", attrs.HeadHash, "payloadTimestamp", uint64(attrs.Timestamp))

submitBestBlock := func() {
queueMu.Lock()
Expand All @@ -378,11 +412,18 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
queueMu.Unlock()
}

// Avoid submitting early into a given slot. For example if slots have 12 second interval, submissions should
// not begin until 8 seconds into the slot.
slotTime := time.Unix(int64(attrs.Timestamp), 0).UTC()
slotSubmitStartTime := slotTime.Add(-SubmissionDelaySecondsDefault)

// Empties queue, submits the best block for current job with rate limit (global for all jobs)
go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock)
go runResubmitLoop(ctx, b.limiter, queueSignal, submitBestBlock, slotSubmitStartTime)

// Populates queue with submissions that increase block profit
blockHook := func(block *types.Block, blockValue *big.Int, ordersCloseTime time.Time, commitedBundles, allBundles []types.SimulatedBundle) {
blockHook := func(block *types.Block, blockValue *big.Int, ordersCloseTime time.Time,
committedBundles, allBundles []types.SimulatedBundle,
) {
if ctx.Err() != nil {
return
}
Expand All @@ -397,7 +438,7 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
blockValue: new(big.Int).Set(blockValue),
ordersCloseTime: ordersCloseTime,
sealedAt: sealedAt,
commitedBundles: commitedBundles,
commitedBundles: committedBundles,
allBundles: allBundles,
}

Expand All @@ -408,9 +449,12 @@ func (b *Builder) runBuildingJob(slotCtx context.Context, proposerPubkey boostTy
}
}

// resubmits block builder requests every second
runRetryLoop(ctx, 500*time.Millisecond, func() {
log.Debug("retrying BuildBlock", "slot", attrs.Slot, "parent", attrs.HeadHash)
// resubmits block builder requests every builderBlockResubmitInterval
runRetryLoop(ctx, b.builderResubmitInterval, func() {
log.Debug("retrying BuildBlock",
"slot", attrs.Slot,
"parent", attrs.HeadHash,
"resubmit-interval", b.builderResubmitInterval.String())
err := b.eth.BuildBlock(attrs, blockHook)
if err != nil {
log.Warn("Failed to build block", "err", err)
Expand Down
1 change: 1 addition & 0 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestOnPayloadAttributes(t *testing.T) {
ignoreLatePayloadAttributes: false,
validator: nil,
beaconClient: &testBeacon,
limiter: nil,
}
builder := NewBuilder(builderArgs)
builder.Start()
Expand Down
43 changes: 24 additions & 19 deletions builder/config.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
package builder

type Config struct {
Enabled bool `toml:",omitempty"`
EnableValidatorChecks bool `toml:",omitempty"`
EnableLocalRelay bool `toml:",omitempty"`
SlotsInEpoch uint64 `toml:",omitempty"`
SecondsInSlot uint64 `toml:",omitempty"`
DisableBundleFetcher bool `toml:",omitempty"`
DryRun bool `toml:",omitempty"`
IgnoreLatePayloadAttributes bool `toml:",omitempty"`
BuilderSecretKey string `toml:",omitempty"`
RelaySecretKey string `toml:",omitempty"`
ListenAddr string `toml:",omitempty"`
GenesisForkVersion string `toml:",omitempty"`
BellatrixForkVersion string `toml:",omitempty"`
GenesisValidatorsRoot string `toml:",omitempty"`
BeaconEndpoints []string `toml:",omitempty"`
RemoteRelayEndpoint string `toml:",omitempty"`
SecondaryRemoteRelayEndpoints []string `toml:",omitempty"`
ValidationBlocklist string `toml:",omitempty"`
EnableCancellations bool `toml:",omitempty"`
Enabled bool `toml:",omitempty"`
EnableValidatorChecks bool `toml:",omitempty"`
EnableLocalRelay bool `toml:",omitempty"`
SlotsInEpoch uint64 `toml:",omitempty"`
SecondsInSlot uint64 `toml:",omitempty"`
DisableBundleFetcher bool `toml:",omitempty"`
DryRun bool `toml:",omitempty"`
IgnoreLatePayloadAttributes bool `toml:",omitempty"`
BuilderSecretKey string `toml:",omitempty"`
RelaySecretKey string `toml:",omitempty"`
ListenAddr string `toml:",omitempty"`
GenesisForkVersion string `toml:",omitempty"`
BellatrixForkVersion string `toml:",omitempty"`
GenesisValidatorsRoot string `toml:",omitempty"`
BeaconEndpoints []string `toml:",omitempty"`
RemoteRelayEndpoint string `toml:",omitempty"`
SecondaryRemoteRelayEndpoints []string `toml:",omitempty"`
ValidationBlocklist string `toml:",omitempty"`
BuilderRateLimitDuration string `toml:",omitempty"`
BuilderRateLimitMaxBurst int `toml:",omitempty"`
BuilderRateLimitResubmitInterval string `toml:",omitempty"`
EnableCancellations bool `toml:",omitempty"`
}

// DefaultConfig is the default config for the builder.
Expand All @@ -42,6 +45,8 @@ var DefaultConfig = Config{
RemoteRelayEndpoint: "",
SecondaryRemoteRelayEndpoints: nil,
ValidationBlocklist: "",
BuilderRateLimitDuration: RateLimitIntervalDefault.String(),
BuilderRateLimitMaxBurst: RateLimitBurstDefault,
EnableCancellations: false,
}

Expand Down
7 changes: 5 additions & 2 deletions builder/local_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func newTestBackend(t *testing.T, forkchoiceData *engine.ExecutableData, block *
ignoreLatePayloadAttributes: false,
validator: nil,
beaconClient: beaconClient,
limiter: nil,
}
backend := NewBuilder(builderArgs)
// service := NewService("127.0.0.1:31545", backend)
Expand Down Expand Up @@ -155,7 +156,8 @@ func TestGetHeader(t *testing.T) {
require.Equal(t, ``, rr.Body.String())
require.Equal(t, 204, rr.Code)

backend.OnPayloadAttribute(&types.BuilderPayloadAttributes{})
err = backend.OnPayloadAttribute(&types.BuilderPayloadAttributes{})
require.NoError(t, err)
time.Sleep(2 * time.Second)

path = fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", 0, forkchoiceData.ParentHash.Hex(), validator.Pk.String())
Expand Down Expand Up @@ -203,7 +205,8 @@ func TestGetPayload(t *testing.T) {
backend, relay, validator := newTestBackend(t, forkchoiceData, forkchoiceBlock, forkchoiceBlockProfit)

registerValidator(t, validator, relay)
backend.OnPayloadAttribute(&types.BuilderPayloadAttributes{})
err = backend.OnPayloadAttribute(&types.BuilderPayloadAttributes{})
require.NoError(t, err)
time.Sleep(2 * time.Second)

path := fmt.Sprintf("/eth/v1/builder/header/%d/%s/%s", 0, forkchoiceData.ParentHash.Hex(), validator.Pk.String())
Expand Down
35 changes: 33 additions & 2 deletions builder/resubmit_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,44 @@ import (
)

// runResubmitLoop checks for update signal and calls submit respecting provided rate limiter and context
func runResubmitLoop(ctx context.Context, limiter *rate.Limiter, updateSignal chan struct{}, submit func()) {
func runResubmitLoop(ctx context.Context, limiter *rate.Limiter, updateSignal <-chan struct{}, submit func(), submitTime time.Time) {
if submitTime.IsZero() {
log.Warn("skipping resubmit loop - zero submit time found")
return
}

var (
waitUntilSubmitTime = func(waitUntil time.Time) (ok bool, err error) {
now := time.Now().UTC()
if waitUntil.UTC().Before(now) {
waitUntil = now
}
sleepTime := waitUntil.UTC().Sub(now.UTC())
select {
case <-ctx.Done():
ok = false
case <-time.After(sleepTime):
ok = true
}
return ok && ctx.Err() == nil, ctx.Err()
}
)

if canContinue, err := waitUntilSubmitTime(submitTime); !canContinue {
log.Warn("skipping resubmit loop - cannot continue", "error", err)
return
}

var res *rate.Reservation
for {
select {
case <-ctx.Done():
return
case <-updateSignal:
res := limiter.Reserve()
// runBuildingJob is example caller that uses updateSignal channel via block hook that sends signal to
// represent submissions that increase block profit

res = limiter.Reserve()
if !res.OK() {
log.Warn("resubmit loop failed to make limiter reservation")
return
Expand Down
Loading

0 comments on commit 66893df

Please sign in to comment.