Skip to content

Commit

Permalink
Do not store validators 0 balances.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcdee committed Feb 5, 2025
1 parent c863f9b commit c14183c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 87 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
0.8.9:
- do not store 0 validator balances (i.e. from validators that are not active, or have exited and withdrawn)

0.8.8:
- handle upstream change of epoch serialization for metadata
- do not error on deposit transactions containing events without topics
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import (
)

// ReleaseVersion is the release version for the code.
var ReleaseVersion = "0.8.8"
var ReleaseVersion = "0.8.9"

func main() {
os.Exit(main2())
Expand Down
164 changes: 78 additions & 86 deletions services/chaindb/postgresql/validators.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2020 - 2023 Weald Technology Trading.
// Copyright © 2020 - 2025 Weald Technology Trading.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -380,14 +380,25 @@ func (s *Service) ValidatorBalancesByEpoch(
tx = s.tx(ctx)
}

// The database does not store balances for 0-balance validators, so join the balances with the validator table to ensure we
// have an returned row for every validator.
rows, err := tx.Query(ctx, `
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
WHERE f_epoch = $1::BIGINT
ORDER BY f_validator_index`,
WITH v_validator_balances AS (
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
WHERE f_epoch = $1
ORDER BY f_validator_index)
SELECT f_index
,COALESCE(f_epoch, $1) AS f_epoch
,COALESCE(f_balance, 0) AS f_balance
,COALESCE(v_validator_balances.f_effective_balance, 0) AS f_effective_balance
FROM t_validators
LEFT JOIN v_validator_balances
ON t_validators.f_index = v_validator_balances.f_validator_index
ORDER BY f_index`,
uint64(epoch),
)
if err != nil {
Expand All @@ -402,9 +413,10 @@ func (s *Service) ValidatorBalancesByEpoch(
if err != nil {
return nil, errors.Wrap(err, "failed to scan row")
}

validatorBalances = append(validatorBalances, validatorBalance)
if uint64(validatorBalance.Index) != uint64(len(validatorBalances)-1) {
panic(fmt.Sprintf("bad index %d with len %d", validatorBalance.Index, len(validatorBalances)))
panic(fmt.Sprintf("bad index %d with len %d at epoch %d", validatorBalance.Index, len(validatorBalances), epoch))
}
}

Expand Down Expand Up @@ -437,15 +449,26 @@ func (s *Service) ValidatorBalancesByIndexAndEpoch(
tx = s.tx(ctx)
}

// The database does not store balances for 0-balance validators, so join the balances with the validator table to ensure we
// have an returned row for every validator.
rows, err := tx.Query(ctx, `
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
WHERE f_epoch = $2::BIGINT
AND f_validator_index = ANY($1)
ORDER BY f_validator_index`,
WITH v_validator_balances AS(
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
WHERE f_epoch = $2
AND f_validator_index = ANY($1))
SELECT f_index
,COALESCE(f_epoch, $2) AS f_epoch
,COALESCE(f_balance, 0) AS f_balance
,COALESCE(v_validator_balances.f_effective_balance, 0) AS f_effective_balance
FROM t_validators
LEFT JOIN v_validator_balances
ON t_validators.f_index = v_validator_balances.f_validator_index
WHERE t_validators.f_index = ANY($1)
ORDER BY f_index`,
validatorIndices,
uint64(epoch),
)
Expand Down Expand Up @@ -501,27 +524,28 @@ func (s *Service) ValidatorBalancesByIndexAndEpochRange(
return validatorIndices[i] < validatorIndices[j]
})

// Create an array for the validator indices. This gives us higher performance for our query.
indices := make([]string, len(validatorIndices))
for i, validatorIndex := range validatorIndices {
indices[i] = fmt.Sprintf("(%d)", validatorIndex)
// Create a matrix of the values we require. This allows the database to fill in the blanks when it doesn't have a balance for
// the required (index,epoch) tuple (for exmple when the balance is 0).
values := make([]string, 0)
for _, validatorIndex := range validatorIndices {
for epoch := startEpoch; epoch < endEpoch; epoch++ {
values = append(values, fmt.Sprintf("(%d,%d)", validatorIndex, epoch))
}
}

rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
JOIN (VALUES %s)
AS x(id)
ON x.id = t_validator_balances.f_validator_index
WHERE f_epoch >= $1
AND f_epoch < $2
ORDER BY f_validator_index
,f_epoch`, strings.Join(indices, ",")),
uint64(startEpoch),
uint64(endEpoch),
WITH v_validator_epochs(f_validator_index, f_epoch) AS(VALUES %s)
SELECT v_validator_epochs.f_validator_index
,v_validator_epochs.f_epoch
,COALESCE(f_balance, 0) AS f_balance
,COALESCE(f_effective_balance, 0) AS f_effective_balance
FROM v_validator_epochs
LEFT JOIN t_validator_balances
ON t_validator_balances.f_validator_index = v_validator_epochs.f_validator_index
AND t_validator_balances.f_epoch = v_validator_epochs.f_epoch
ORDER BY f_validator_index
,f_epoch`,
strings.Join(values, ",")),
)
if err != nil {
return nil, err
Expand All @@ -541,12 +565,6 @@ func (s *Service) ValidatorBalancesByIndexAndEpochRange(
validatorBalances[validatorBalance.Index] = append(validatorBalances[validatorBalance.Index], validatorBalance)
}

// If a validator is not present until after the beginning of the range, for example we ask for epochs 5->10 and
// the validator is first present at epoch 7, we need to front-pad the data for that validator with 0s.
if err := padValidatorBalances(validatorBalances, int(uint64(endEpoch)-uint64(startEpoch)), startEpoch); err != nil {
return nil, err
}

return validatorBalances, nil
}

Expand Down Expand Up @@ -581,29 +599,28 @@ func (s *Service) ValidatorBalancesByIndexAndEpochs(
return validatorIndices[i] < validatorIndices[j]
})

// Create an array for the validator indices. This gives us higher performance for our query.
indices := make([]string, len(validatorIndices))
for i, validatorIndex := range validatorIndices {
indices[i] = fmt.Sprintf("(%d)", validatorIndex)
// Create a matrix of the values we require. This allows the database to fill in the blanks when it doesn't have a balance for
// the required (index,epoch) tuple (for exmple when the balance is 0).
values := make([]string, 0)
for _, validatorIndex := range validatorIndices {
for _, epoch := range epochs {
values = append(values, fmt.Sprintf("(%d,%d)", validatorIndex, epoch))
}
}

dbEpochs := make([]uint64, len(epochs))
for i, epoch := range epochs {
dbEpochs[i] = uint64(epoch)
}
rows, err := tx.Query(ctx, fmt.Sprintf(`
SELECT f_validator_index
,f_epoch
,f_balance
,f_effective_balance
FROM t_validator_balances
JOIN (VALUES %s)
AS x(id)
ON x.id = t_validator_balances.f_validator_index
WHERE f_epoch = ANY($1)
ORDER BY f_validator_index
,f_epoch`, strings.Join(indices, ",")),
dbEpochs,
WITH v_validator_epochs(f_validator_index, f_epoch) AS(VALUES %s)
SELECT v_validator_epochs.f_validator_index
,v_validator_epochs.f_epoch
,COALESCE(f_balance, 0) AS f_balance
,COALESCE(f_effective_balance, 0) AS f_effective_balance
FROM v_validator_epochs
LEFT JOIN t_validator_balances
ON t_validator_balances.f_validator_index = v_validator_epochs.f_validator_index
AND t_validator_balances.f_epoch = v_validator_epochs.f_epoch
ORDER BY f_validator_index
,f_epoch`,
strings.Join(values, ",")),
)
if err != nil {
return nil, err
Expand All @@ -626,31 +643,6 @@ func (s *Service) ValidatorBalancesByIndexAndEpochs(
return validatorBalances, nil
}

func padValidatorBalances(validatorBalances map[phase0.ValidatorIndex][]*chaindb.ValidatorBalance, entries int, startEpoch phase0.Epoch) error {
for validatorIndex, balances := range validatorBalances {
if len(balances) != entries {
paddedBalances := make([]*chaindb.ValidatorBalance, entries)
padding := entries - len(balances)
for i := range padding {
paddedBalances[i] = &chaindb.ValidatorBalance{
Index: validatorIndex,
Epoch: startEpoch + phase0.Epoch(i),
Balance: 0,
EffectiveBalance: 0,
}
}
if len(balances) > 0 && balances[0].Epoch != startEpoch+phase0.Epoch(padding) {
return fmt.Errorf("data missing in chaindb for validator %d", validatorIndex)
}

copy(paddedBalances[padding:], balances)
validatorBalances[validatorIndex] = paddedBalances
}
}

return nil
}

// validatorFromRow converts a SQL row in to a validator.
func validatorFromRow(rows pgx.Rows) (*chaindb.Validator, error) {
var publicKey []byte
Expand Down
4 changes: 4 additions & 0 deletions services/finalizer/standard/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (s *Service) updateCanonicalBlocks(ctx context.Context, root phase0.Root) e
return errors.Wrap(err, "failed to obtain block")
}

if block == nil {
return fmt.Errorf("unable to obtain block for root %#x", root)
}

s.log.Trace().Uint64("slot", uint64(block.Slot)).Msg("Canonicalizing up to slot")

if err := s.canonicalizeBlocks(ctx, root, phase0.Slot(md.LatestCanonicalSlot)); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions services/validators/standard/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ func (s *Service) onEpochTransitionValidatorBalancesForEpoch(ctx context.Context
if s.balances {
dbValidatorBalances := make([]*chaindb.ValidatorBalance, 0, len(validators))
for index, validator := range validators {
// Do not store 0 balances.
if validator.Balance == 0 {
continue
}
dbValidatorBalances = append(dbValidatorBalances, &chaindb.ValidatorBalance{
Index: index,
Epoch: epoch,
Expand Down

0 comments on commit c14183c

Please sign in to comment.