Skip to content

Commit

Permalink
increase tortoise window for past layers on mainnet (#5762)
Browse files Browse the repository at this point in the history
closes: #5659

in order to decode ballots while syncing layers before 30_000 window needs to be set to 10_000 (this is what this value was originally). otherwise many ballots can't be decoded as they reference base ballots outside of 4032 window. this breaks the chain of votes and make tortoise unable to cross necessary weight.

this change introduces HistoricalWindowSize parameter in tortoise, that sets window size to 10_000 for layers before 30_000.
it also removes eviction logic from atxsdata and moves it inside tortoise, so that we don't have to have similar conditionals in more than 1 place.
  • Loading branch information
dshulyak committed Mar 22, 2024
1 parent 776f66c commit 3649c2d
Show file tree
Hide file tree
Showing 16 changed files with 95 additions and 91 deletions.
44 changes: 4 additions & 40 deletions atxsdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
)

// minCapacity is set to 2 epochs because we are using data from current epoch for tortoise
// and at the start of current epoch we still need data from previous epoch for hare oracle.
const minCapacity = 2

// SAFETY: all exported fields are read-only and are safe to read concurrently.
// Thanks to the fact that ATX is immutable, it is safe to return a pointer to it.
type ATX struct {
Expand All @@ -25,44 +21,16 @@ type ATX struct {
malicious bool
}

type Opt func(*Data)

// WithCapacity sets the number of epochs from the latest applied
// that cache will maintain in memory.
func WithCapacity(capacity types.EpochID) Opt {
return func(cache *Data) {
cache.capacity = max(minCapacity, capacity)
}
}

// WithCapacityFromLayers sets capacity to include all layers in the window.
func WithCapacityFromLayers(window, epochSize uint32) Opt {
capacity := window / epochSize
if window%epochSize != 0 {
capacity++
}
return WithCapacity(types.EpochID(capacity))
}

func New(opts ...Opt) *Data {
cache := &Data{
capacity: 2,
func New() *Data {
return &Data{
malicious: map[types.NodeID]struct{}{},
epochs: map[types.EpochID]epochCache{},
}
for _, opt := range opts {
opt(cache)
}
return cache
}

type Data struct {
evicted atomic.Uint32

// number of epochs to keep
// capacity is not enforced by the cache itself
capacity types.EpochID

mu sync.RWMutex
malicious map[types.NodeID]struct{}
epochs map[types.EpochID]epochCache
Expand All @@ -80,15 +48,11 @@ func (d *Data) IsEvicted(epoch types.EpochID) bool {
return d.evicted.Load() >= epoch.Uint32()
}

// OnEpoch is a notification for cache to evict epochs that are not useful
// EvictEpoch is a notification for cache to evict epochs that are not useful
// to keep in memory.
func (d *Data) OnEpoch(applied types.EpochID) {
func (d *Data) EvictEpoch(evict types.EpochID) {
d.mu.Lock()
defer d.mu.Unlock()
if applied < d.capacity {
return
}
evict := applied - d.capacity
if d.IsEvicted(evict) {
return
}
Expand Down
20 changes: 7 additions & 13 deletions atxsdata/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ func TestData(t *testing.T) {
capacity = 3
applied = epochs / 2
)
c := New(WithCapacity(capacity))
c := New()
node := types.NodeID{1}
for epoch := 1; epoch <= epochs; epoch++ {
c.Add(types.EpochID(epoch), node, types.Address{}, types.ATXID{}, 2, 0, 0, 0, false)
data := c.Get(types.EpochID(epoch), types.ATXID{})
require.NotNil(t, data)
}
c.OnEpoch(applied)

evicted := applied - capacity
c.EvictEpoch(types.EpochID(evicted))
require.EqualValues(t, evicted, c.Evicted())
for epoch := 1; epoch <= epochs; epoch++ {
require.Equal(t, epoch <= evicted, c.IsEvicted(types.EpochID(epoch)), "epoch=%v", epoch)
Expand Down Expand Up @@ -130,18 +131,11 @@ func TestData(t *testing.T) {
})
t.Run("adding after eviction", func(t *testing.T) {
c := New()
c.OnEpoch(0)
c.OnEpoch(3)
c.EvictEpoch(0)
c.EvictEpoch(3)
c.Add(1, types.NodeID{1}, types.Address{}, types.ATXID{1}, 500, 100, 0, 0, false)
require.Nil(t, c.Get(3, types.ATXID{1}))
c.OnEpoch(3)
})

t.Run("capacity from layers", func(t *testing.T) {
c := New(WithCapacityFromLayers(10, 3))
c.OnEpoch(5)
require.True(t, c.IsEvicted(1))
require.False(t, c.IsEvicted(2))
c.EvictEpoch(3)
})
}

Expand All @@ -166,7 +160,7 @@ func TestMemory(t *testing.T) {
runtime.ReadMemStats(&after)
require.InDelta(t, after.HeapInuse-before.HeapInuse, memory, float64(delta))

c.OnEpoch(0) // otherwise cache will be gc'ed
c.EvictEpoch(0) // otherwise cache will be gc'ed
}
t.Run("1_000_000", func(t *testing.T) {
test(t, 1_000_000, 189_956_096, 300_000)
Expand Down
14 changes: 9 additions & 5 deletions atxsdata/warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
)

func Warm(db *sql.Database, opts ...Opt) (*Data, error) {
cache := New(opts...)
func Warm(db *sql.Database, keep types.EpochID) (*Data, error) {
cache := New()
tx, err := db.Tx(context.Background())
if err != nil {
return nil, err
}
defer tx.Release()
if err := Warmup(tx, cache); err != nil {
if err := Warmup(tx, cache, keep); err != nil {
return nil, fmt.Errorf("warmup %w", err)
}
return cache, nil
}

func Warmup(db sql.Executor, cache *Data) error {
func Warmup(db sql.Executor, cache *Data, keep types.EpochID) error {
latest, err := atxs.LatestEpoch(db)
if err != nil {
return err
Expand All @@ -33,7 +33,11 @@ func Warmup(db sql.Executor, cache *Data) error {
if err != nil {
return err
}
cache.OnEpoch(applied.GetEpoch())
var evict types.EpochID
if applied.GetEpoch() > keep {
evict = applied.GetEpoch() - keep - 1
}
cache.EvictEpoch(evict)

var ierr error
if err := atxs.IterateAtxsData(db, cache.Evicted(), latest,
Expand Down
10 changes: 5 additions & 5 deletions atxsdata/warmup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,29 @@ func TestWarmup(t *testing.T) {
}
require.NoError(t, layers.SetApplied(db, applied, types.BlockID{1}))

c, err := Warm(db, WithCapacity(1))
c, err := Warm(db, 1)
require.NoError(t, err)
for _, atx := range data[2:] {
require.NotNil(t, c.Get(atx.TargetEpoch(), atx.ID()))
}
})
t.Run("no data", func(t *testing.T) {
c, err := Warm(sql.InMemory(), WithCapacity(1))
c, err := Warm(sql.InMemory(), 1)
require.NoError(t, err)
require.NotNil(t, c)
})
t.Run("closed db", func(t *testing.T) {
db := sql.InMemory()
require.NoError(t, db.Close())
c, err := Warm(db, WithCapacity(1))
c, err := Warm(db, 1)
require.Error(t, err)
require.Nil(t, c)
})
t.Run("missing nonce", func(t *testing.T) {
db := sql.InMemory()
data := gatx(types.ATXID{1, 1}, 1, types.NodeID{1}, nil)
require.NoError(t, atxs.Add(db, &data))
c, err := Warm(db)
c, err := Warm(db, 1)
require.Error(t, err)
require.Nil(t, c)
})
Expand All @@ -105,7 +105,7 @@ func TestWarmup(t *testing.T) {
AnyTimes()
for i := 0; i < 5; i++ {
c := New()
require.Error(t, Warmup(exec, c))
require.Error(t, Warmup(exec, c, 1))
fail++
call = 0
}
Expand Down
3 changes: 3 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ func MainnetConfig() Config {
MinimalActiveSetWeight: []types.EpochMinimalActiveWeight{
{Weight: 1_000_000},
},
HistoricalWindowSize: []tortoise.WindowSizeInterval{
{End: 30_000, Window: 10_000},
},
},
HARE3: hare3conf,
HareEligibility: eligibility.Config{
Expand Down
2 changes: 0 additions & 2 deletions mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,6 @@ func (msh *Mesh) applyResults(ctx context.Context, results []result.Layer) error
Status: events.LayerStatusTypeApplied,
})
}

msh.atxsdata.OnEpoch(layer.Layer.GetEpoch())
if layer.Layer > msh.LatestLayerInState() {
msh.setLatestLayerInState(layer.Layer)
}
Expand Down
6 changes: 5 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,10 +1794,14 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
)
}
app.log.Info("starting cache warmup")
applied, err := layers.GetLastApplied(app.db)
if err != nil {
return err
}
start := time.Now()
data, err := atxsdata.Warm(
app.db,
atxsdata.WithCapacityFromLayers(app.Config.Tortoise.WindowSize, app.Config.LayersPerEpoch),
app.Config.Tortoise.WindowSizeEpochs(applied),
)
if err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions proposals/eligibility_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,9 +563,8 @@ func TestEligibilityValidator(t *testing.T) {
}).AnyTimes()

lg := logtest.New(t)
const capacity = 2
c := atxsdata.New(atxsdata.WithCapacity(capacity))
c.OnEpoch(tc.evicted + capacity)
c := atxsdata.New()
c.EvictEpoch(tc.evicted)
tv := NewEligibilityValidator(
layerAvgSize,
layersPerEpoch,
Expand Down
31 changes: 28 additions & 3 deletions tortoise/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (

// Config for protocol parameters.
type Config struct {
Hdist uint32 `mapstructure:"tortoise-hdist"` // hare output lookback distance
Zdist uint32 `mapstructure:"tortoise-zdist"` // hare result wait distance
// how long we are waiting for a switch from verifying to full. relevant during rerun.
WindowSize uint32 `mapstructure:"tortoise-window-size"` // size of the tortoise sliding window (in layers)
Hdist uint32 `mapstructure:"tortoise-hdist"` // hare output lookback distance
Zdist uint32 `mapstructure:"tortoise-zdist"` // hare result wait distance
WindowSize uint32 `mapstructure:"tortoise-window-size"` // size of the tortoise sliding window (in layers)
HistoricalWindowSize []WindowSizeInterval
// ignored if candidate for base ballot has more than max exceptions
MaxExceptions int `mapstructure:"tortoise-max-exceptions"`
// number of layers to delay votes for blocks with bad beacon values during self-healing. ideally a full epoch.
Expand All @@ -36,6 +37,12 @@ type Config struct {
LayerSize uint32
}

type WindowSizeInterval struct {
Start types.LayerID
End types.LayerID
Window uint32
}

// DefaultConfig for Tortoise.
func DefaultConfig() Config {
return Config{
Expand All @@ -48,6 +55,24 @@ func DefaultConfig() Config {
}
}

func (c *Config) WindowSizeLayers(applied types.LayerID) types.LayerID {
for _, interval := range c.HistoricalWindowSize {
if applied >= interval.Start && applied <= interval.End {
return types.LayerID(interval.Window)
}
}
return types.LayerID(c.WindowSize)
}

func (c *Config) WindowSizeEpochs(applied types.LayerID) types.EpochID {
layers := c.WindowSizeLayers(applied)
quo := layers / types.LayerID(types.GetLayersPerEpoch())
if layers%types.LayerID(types.GetLayersPerEpoch()) != 0 {
quo++
}
return types.EpochID(quo)
}

// Tortoise is a thread safe verifying tortoise wrapper, it just locks all actions.
type Tortoise struct {
logger *zap.Logger
Expand Down
4 changes: 2 additions & 2 deletions tortoise/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func Recover(
}

start := types.GetEffectiveGenesis() + 1
if applied > types.LayerID(trtl.cfg.WindowSize) {
if size := trtl.cfg.WindowSizeLayers(applied); applied > size {
// we want to emulate the same condition as during genesis with one difference.
// genesis starts with zero opinion (aggregated hash) - see computeOpinion method.
// but in this case first processed layer should use non-zero opinion of the the previous layer.

window := applied - types.LayerID(trtl.cfg.WindowSize)
window := applied - size
// we start tallying votes from the first layer of the epoch to guarantee that we load reference ballots.
// reference ballots track beacon and eligibilities
window = window.GetEpoch().FirstLayer()
Expand Down
8 changes: 5 additions & 3 deletions tortoise/replay/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/spacemeshos/go-spacemesh/config"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/timesync"
"github.com/spacemeshos/go-spacemesh/tortoise"
)
Expand Down Expand Up @@ -54,10 +55,11 @@ func TestReplayMainnet(t *testing.T) {
db, err := sql.Open(fmt.Sprintf("file:%s?mode=ro", *dbpath))
require.NoError(t, err)

applied, err := layers.GetLastApplied(db)
require.NoError(t, err)

start := time.Now()
atxsdata, err := atxsdata.Warm(db,
atxsdata.WithCapacityFromLayers(cfg.Tortoise.WindowSize, cfg.LayersPerEpoch),
)
atxsdata, err := atxsdata.Warm(db, cfg.Tortoise.WindowSizeEpochs(applied))
require.NoError(t, err)
trtl, err := tortoise.Recover(
context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion tortoise/sim/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func New(opts ...GenOpt) *Generator {
}
// TODO support multiple persist states.
for i := 0; i < g.conf.StateInstances; i++ {
atxdata := atxsdata.New(atxsdata.WithCapacityFromLayers(g.conf.WindowSize, g.conf.LayersPerEpoch))
atxdata := atxsdata.New()
g.states = append(g.states, newState(g.logger, g.conf, atxdata))
}
return g
Expand Down
5 changes: 3 additions & 2 deletions tortoise/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ func computeExpectedWeightInWindow(
target, processed, last types.LayerID,
) weight {
window := last
if last.Difference(target) > config.WindowSize {
window = target.Add(config.WindowSize)
windowSize := config.WindowSizeLayers(target)
if last.Difference(target) > windowSize.Uint32() {
window = target.Add(windowSize.Uint32())
if processed.After(window) {
window = processed
}
Expand Down
16 changes: 16 additions & 0 deletions tortoise/threshold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ func TestComputeThreshold(t *testing.T) {
},
expectedGlobal: fixed.From(11.25),
},
{
desc: "historical window size based on the target layer",
config: Config{
WindowSize: 2,
HistoricalWindowSize: []WindowSizeInterval{
{End: genesis.Add(length), Window: 4},
},
},
last: genesis.Add(2 * length),
target: genesis,
epochs: map[types.EpochID]*epochInfo{
2: {weight: fixed.From(45)},
3: {weight: fixed.From(45)},
},
expectedGlobal: fixed.From(15),
},
} {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
Expand Down
Loading

0 comments on commit 3649c2d

Please sign in to comment.