Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - in memory index with atx data for consensus layer #5013

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b3fbc3e
add data structure for cache
dshulyak Sep 14, 2023
d5e1e10
warmup and evicion handler
dshulyak Sep 15, 2023
e4009bd
populat cache and evict
dshulyak Sep 15, 2023
dc74a14
Merge branch 'develop' into consensus-cache
dshulyak Sep 19, 2023
f3b4fdb
support slow and fast path using cache in proposals handler
dshulyak Sep 19, 2023
bcbb769
bad init
dshulyak Sep 19, 2023
22dd804
Merge branch 'develop' into consensus-cache
dshulyak Sep 20, 2023
bfda8d9
proposals back to 100
dshulyak Sep 20, 2023
2ed1e21
finish basic coverage
dshulyak Sep 21, 2023
fd8b5aa
reuse lru cache to set malicious
dshulyak Sep 21, 2023
94d20be
Merge branch 'develop' into consensus-cache
dshulyak Sep 21, 2023
d976e04
resolve conflicts
dshulyak Sep 21, 2023
b9f59b2
Merge branch 'develop' into consensus-cache
dshulyak Sep 21, 2023
14338fe
add relevant benchmark and memory estimate
dshulyak Sep 21, 2023
5cc11d6
fix test in miner
dshulyak Sep 21, 2023
ee6a98b
add metrics to track atxs/identities in cache
dshulyak Sep 21, 2023
7f47c0f
change default capacity to 2
dshulyak Sep 21, 2023
2b0410f
Merge branch 'develop' into consensus-cache
dshulyak Sep 21, 2023
5a70266
memory assertion
dshulyak Sep 21, 2023
4c790c3
regen with 0.3.0
dshulyak Sep 21, 2023
2a9830c
Merge branch 'develop' into consensus-cache
dshulyak Sep 25, 2023
d6f896b
fix some mistakes
dshulyak Sep 27, 2023
31f226f
remove cachedb from handler
dshulyak Sep 27, 2023
c0f7a20
Merge branch 'develop' into consensus-cache
dshulyak Oct 19, 2023
e887d07
refactor to use btree
dshulyak Oct 19, 2023
b510321
switch to pointers
dshulyak Oct 19, 2023
fcf9e2e
add weight for set routine
dshulyak Oct 19, 2023
ba31735
add benchmark
dshulyak Oct 20, 2023
dbfee1c
use binary search instead of map
dshulyak Oct 20, 2023
5eefe50
integrate in proposals
dshulyak Oct 20, 2023
7782971
Merge branch 'develop' into consensus-cache
dshulyak Oct 20, 2023
edd6406
btree dep
dshulyak Oct 20, 2023
8990e36
add with capacity from layers
dshulyak Oct 22, 2023
e6d1ca4
configure capacity based on tortoise window
dshulyak Oct 22, 2023
de00596
drop slowpath code
dshulyak Oct 22, 2023
4696054
drop todo
dshulyak Oct 22, 2023
184b9e3
rename cache to atxsdata
dshulyak Oct 22, 2023
36a02f8
cleanup
dshulyak Oct 22, 2023
1d3d3ea
Merge branch 'develop' into consensus-cache
dshulyak Oct 22, 2023
6a65672
add hashmap for malicious
dshulyak Oct 22, 2023
7986ca5
refactor to store index by atxid
dshulyak Oct 22, 2023
c62029e
drop binary search bench
dshulyak Oct 22, 2023
15644cf
fix test
dshulyak Oct 23, 2023
9959c38
cleanup locking
dshulyak Oct 24, 2023
85e48c8
Merge branch 'develop' into consensus-cache
dshulyak Oct 24, 2023
fbf6667
review
dshulyak Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spacemeshos/post/shared"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -40,6 +41,7 @@ type atxChan struct {
// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
cdb *datastore.CachedDB
cache *cache.Cache
edVerifier *signing.EdVerifier
clock layerClock
publisher pubsub.Publisher
Expand All @@ -58,6 +60,7 @@ type Handler struct {
// NewHandler returns a data handler for ATX.
func NewHandler(
cdb *datastore.CachedDB,
cache *cache.Cache,
edVerifier *signing.EdVerifier,
c layerClock,
pub pubsub.Publisher,
Expand All @@ -72,6 +75,7 @@ func NewHandler(
) *Handler {
return &Handler{
cdb: cdb,
cache: cache,
edVerifier: edVerifier,
clock: c,
publisher: pub,
Expand Down Expand Up @@ -356,6 +360,22 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error
return err
}

func (h *Handler) cacheAtx(ctx context.Context, atx *types.ActivationTxHeader) {
if !h.cache.IsEvicted(atx.TargetEpoch()) {
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
nonce, err := h.cdb.VRFNonce(atx.NodeID, atx.TargetEpoch())
if err != nil {
h.log.With().Error("failed vrf nonce read", log.Err(err), log.Context(ctx))
return
}
malicious, err := h.cdb.IsMalicious(atx.NodeID)
if err != nil {
h.log.With().Error("failed is malicious read", log.Err(err), log.Context(ctx))
return
}
h.cache.Add(atx.TargetEpoch(), atx.NodeID, atx.ID, cache.ToATXData(atx, nonce, malicious))
}
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
malicious, err := h.cdb.IsMalicious(atx.SmesherID)
Expand Down Expand Up @@ -421,6 +441,7 @@ func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx)
}
h.beacon.OnAtx(header)
h.tortoise.OnAtx(header.ToData())
h.cacheAtx(ctx, header)

// notify subscribers
if ch, found := h.atxChannels[atx.ID()]; found {
Expand Down
3 changes: 2 additions & 1 deletion activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -101,7 +102,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler {
mbeacon := NewMockAtxReceiver(ctrl)
mtortoise := mocks.NewMockTortoise(ctrl)

atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
atxHdlr := NewHandler(cdb, cache.New(), verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
return &testHandler{
Handler: atxHdlr,

Expand Down
186 changes: 186 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package cache

import (
"bytes"
"sort"
"sync"

"go.uber.org/atomic"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type ATXData struct {
dshulyak marked this conversation as resolved.
Show resolved Hide resolved
Weight uint64
BaseHeight, Height uint64
Nonce types.VRFPostIndex
Malicious bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think nonce and malicious is a property of identity, not atx.

if an identity is marked malicious in epoch 3, it should still be malicious in epoch 6. but with current design if nodes didn't restart btwn these time it will be forgotten.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't matter whose property logically is it. it is about how data is stored and indexed.
identity is relevant in consensus only if it has atx in this epoch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

identity is relevant in consensus only if it has atx in this epoch.

this is theoretically true, but in practice we already have this in hare/tortoise where we use nodeid->malicious mapping to avoid checking nodeid->"atx in this epoch" lookup.

and cachedb's IsMalicious() is also by nodeid. how do you plan to replace that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hare GetByNode, tortoise Get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specifically hare will use only fastpath that relies on consensus data, it won't even need to have references to db for that.

tortoise (proposal/ballots handler) will query using Get(atx)

}

type Opt func(*Cache)

// WithCapacity sets the number of epochs from the latest applied
// that cache will maintain in memory.
func WithCapacity(capacity types.EpochID) Opt {
return func(cache *Cache) {
cache.capacity = capacity
}
}

func New(opts ...Opt) *Cache {
cache := &Cache{
capacity: 2,
epochs: map[types.EpochID]epochCache{},
}
for _, opt := range opts {
opt(cache)
}
return cache
}

type Cache struct {
evicted atomic.Uint32

// number of epochs to keep
// capacity is enforced by the cache itself
capacity types.EpochID

mu sync.RWMutex
epochs map[types.EpochID]epochCache
}

type epochCache struct {
atxs map[types.ATXID]*ATXData
identities map[types.NodeID][]types.ATXID
}

func (c *Cache) Evicted() types.EpochID {
return types.EpochID(c.evicted.Load())
}

func (c *Cache) IsEvicted(epoch types.EpochID) bool {
return c.evicted.Load() >= epoch.Uint32()
}

// OnEpoch is a notification for cache to evict epochs that are not useful
// to keep in memory.
func (c *Cache) OnEpoch(applied types.EpochID) {
if applied < c.capacity {
return
}
evict := applied - c.capacity
if c.IsEvicted(evict) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.evicted.Load() < evict.Uint32() {
c.evicted.Store(evict.Uint32())
}
for epoch := range c.epochs {
if epoch <= evict {
delete(c.epochs, epoch)
atxsCounter.DeleteLabelValues(epoch.String())
identitiesCounter.DeleteLabelValues(epoch.String())
}
}
}

func (c *Cache) Add(epoch types.EpochID, node types.NodeID, atx types.ATXID, data *ATXData) {
if c.IsEvicted(epoch) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
ecache, exists := c.epochs[epoch]
if !exists {
ecache = epochCache{
atxs: map[types.ATXID]*ATXData{},
identities: map[types.NodeID][]types.ATXID{},
}
c.epochs[epoch] = ecache
}
if _, exists := ecache.atxs[atx]; exists {
return
}
ecache.atxs[atx] = data
atxs := ecache.identities[node]

atxsCounter.WithLabelValues(epoch.String()).Add(1)
if atxs == nil {
identitiesCounter.WithLabelValues(epoch.String()).Add(1)
}

atxs = append(atxs, atx)
// NOTE(dshulyak) doesn't make sense, as we don't guarantee that every node
// will see same atxs. it actually should see atmost one and malfeasence proof if node equivocated.
// find out if we have use case in consensus.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need this. the identity should already be set malicious when Cache.SetMalicious() is called.
and at that point it shouldn't matter which ATX is returned for a malicious identity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to store them to support NodeHasAtx. otherwise ballots won't be accepted. and for consistency it is better to keep them ordered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was a plan to only store the first equivocating atx/ballot and only sync those when they are referenced.
i see NodeHasAtx is used to check proposal eligibility. do you think this step is needed if the ballot is malicious?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does it matter?

sort.Slice(atxs, func(i, j int) bool {
return bytes.Compare(atxs[i].Bytes(), atxs[j].Bytes()) == -1
})
ecache.identities[node] = atxs
}

func (c *Cache) SetMalicious(node types.NodeID) {
c.mu.Lock()
defer c.mu.Unlock()
for _, ecache := range c.epochs {
for _, atx := range ecache.identities[node] {
data := ecache.atxs[atx]
// copy on update instead of copying on read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand. if a reference is stored in the map, why not update it directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will be a race with returned values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry still not clear. do you mean there will be a race if you do ecache.atxs[atx].Malicious = true directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pointer is returned in response

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map stores a pointer, and it returns it to avoid copy on every read. during update that pointer is not protected by mutex, so if we update that memory directly race will be noticed and reported. therefore we copy it, update, and reinsert into map that is protected by mutex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanation. i've made mistakes like this a few times.

updated := *data
updated.Malicious = true
ecache.atxs[atx] = &updated
}
}
}

// Get returns atx data.
func (c *Cache) Get(epoch types.EpochID, atx types.ATXID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
data, exists := ecache.atxs[atx]
if !exists {
return nil
}
return data
}

// GetByNode returns atx data of the first atx in lexicographic order.
func (c *Cache) GetByNode(epoch types.EpochID, node types.NodeID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
atxids, exists := ecache.identities[node]
if !exists {
return nil
}
return ecache.atxs[atxids[0]]
}

// NodeHasAtx returns true if atx was registered with a given node id.
func (c *Cache) NodeHasAtx(epoch types.EpochID, node types.NodeID, atx types.ATXID) bool {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return false
}
atxids, exists := ecache.identities[node]
if !exists {
return false
}
for i := range atxids {
if atxids[i] == atx {
return true
}
}
return false
}
Loading