Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

list atxs from cached epochs from memory in atx sync #5071

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spacemeshos/post/shared"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -40,6 +41,7 @@ type atxChan struct {
// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
cdb *datastore.CachedDB
cache *cache.Cache
edVerifier *signing.EdVerifier
clock layerClock
publisher pubsub.Publisher
Expand All @@ -58,6 +60,7 @@ type Handler struct {
// NewHandler returns a data handler for ATX.
func NewHandler(
cdb *datastore.CachedDB,
cache *cache.Cache,
edVerifier *signing.EdVerifier,
c layerClock,
pub pubsub.Publisher,
Expand All @@ -72,6 +75,7 @@ func NewHandler(
) *Handler {
return &Handler{
cdb: cdb,
cache: cache,
edVerifier: edVerifier,
clock: c,
publisher: pub,
Expand Down Expand Up @@ -356,6 +360,22 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error
return err
}

func (h *Handler) cacheAtx(ctx context.Context, atx *types.ActivationTxHeader) {
if !h.cache.IsEvicted(atx.TargetEpoch()) {
nonce, err := h.cdb.VRFNonce(atx.NodeID, atx.TargetEpoch())
if err != nil {
h.log.With().Error("failed vrf nonce read", log.Err(err), log.Context(ctx))
return
}
malicious, err := h.cdb.IsMalicious(atx.NodeID)
if err != nil {
h.log.With().Error("failed is malicious read", log.Err(err), log.Context(ctx))
return
}
h.cache.Add(atx.TargetEpoch(), atx.NodeID, atx.ID, cache.ToATXData(atx, nonce, malicious))
}
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
malicious, err := h.cdb.IsMalicious(atx.SmesherID)
Expand Down Expand Up @@ -421,6 +441,7 @@ func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx)
}
h.beacon.OnAtx(header)
h.tortoise.OnAtx(header.ToData())
h.cacheAtx(ctx, header)

// notify subscribers
if ch, found := h.atxChannels[atx.ID()]; found {
Expand Down
3 changes: 2 additions & 1 deletion activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -101,7 +102,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler {
mbeacon := NewMockAtxReceiver(ctrl)
mtortoise := mocks.NewMockTortoise(ctrl)

atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
atxHdlr := NewHandler(cdb, cache.New(), verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
return &testHandler{
Handler: atxHdlr,

Expand Down
204 changes: 204 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package cache

import (
"bytes"
"sort"
"sync"

"go.uber.org/atomic"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type ATXData struct {
Weight uint64
BaseHeight, Height uint64
Nonce types.VRFPostIndex
Malicious bool
}

type Opt func(*Cache)

// WithCapacity sets the number of epochs from the latest applied
// that cache will maintain in memory.
func WithCapacity(capacity types.EpochID) Opt {
return func(cache *Cache) {
cache.capacity = capacity
}
}

func New(opts ...Opt) *Cache {
cache := &Cache{
capacity: 2,
epochs: map[types.EpochID]epochCache{},
}
for _, opt := range opts {
opt(cache)
}
return cache
}

type Cache struct {
evicted atomic.Uint32

// number of epochs to keep
// capacity is enforced by the cache itself
capacity types.EpochID

mu sync.RWMutex
epochs map[types.EpochID]epochCache
}

type epochCache struct {
atxs map[types.ATXID]*ATXData
identities map[types.NodeID][]types.ATXID
}

func (c *Cache) Evicted() types.EpochID {
return types.EpochID(c.evicted.Load())
}

func (c *Cache) IsEvicted(epoch types.EpochID) bool {
return c.evicted.Load() >= epoch.Uint32()
}

// OnApplied is a notification for cache to evict epochs that are not useful
// to keep in memory.
func (c *Cache) OnApplied(applied types.EpochID) {
if applied < c.capacity {
return
}
evict := applied - c.capacity
if c.IsEvicted(evict) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.evicted.Load() < applied.Uint32() {
c.evicted.Store(evict.Uint32())
}
for epoch := range c.epochs {
if epoch <= evict {
delete(c.epochs, epoch)
atxsCounter.DeleteLabelValues(epoch.String())
identitiesCounter.DeleteLabelValues(epoch.String())
}
}
}

func (c *Cache) Add(epoch types.EpochID, node types.NodeID, atx types.ATXID, data *ATXData) {
if c.IsEvicted(epoch) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
ecache, exists := c.epochs[epoch]
if !exists {
ecache = epochCache{
atxs: map[types.ATXID]*ATXData{},
identities: map[types.NodeID][]types.ATXID{},
}
c.epochs[epoch] = ecache
}
if _, exists := ecache.atxs[atx]; exists {
return
}
ecache.atxs[atx] = data
atxs := ecache.identities[node]

atxsCounter.WithLabelValues(epoch.String()).Add(1)
if atxs == nil {
identitiesCounter.WithLabelValues(epoch.String()).Add(1)
}

atxs = append(atxs, atx)
// NOTE(dshulyak) doesn't make sense, as we don't guarantee that every node
// will see same atxs. it actually should see atmost one and malfeasence proof if node equivocated.
// find out if we have use case in consensus.
sort.Slice(atxs, func(i, j int) bool {
return bytes.Compare(atxs[i].Bytes(), atxs[j].Bytes()) == -1
})
ecache.identities[node] = atxs
}

func (c *Cache) SetMalicious(node types.NodeID) {
c.mu.Lock()
defer c.mu.Unlock()
for _, ecache := range c.epochs {
for _, atx := range ecache.identities[node] {
data := ecache.atxs[atx]
// copy on update instead of copying on read
updated := *data
updated.Malicious = true
ecache.atxs[atx] = &updated
}
}
}

// Get returns atx data.
func (c *Cache) Get(epoch types.EpochID, atx types.ATXID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
data, exists := ecache.atxs[atx]
if !exists {
return nil
}
return data
}

// GetByNode returns atx data of the first atx in lexicographic order.
func (c *Cache) GetByNode(epoch types.EpochID, node types.NodeID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
atxids, exists := ecache.identities[node]
if !exists {
return nil
}
return ecache.atxs[atxids[0]]
}

// List all known atxs.
func (c *Cache) List(epoch types.EpochID) []types.ATXID {
if c.IsEvicted(epoch) {
return nil
}
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
ids := make([]types.ATXID, 0, len(ecache.atxs))
for id := range ecache.atxs {
ids = append(ids, id)
}
return ids
}

// NodeHasAtx returns true if atx was registered with a given node id.
func (c *Cache) NodeHasAtx(epoch types.EpochID, node types.NodeID, atx types.ATXID) bool {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return false
}
atxids, exists := ecache.identities[node]
if !exists {
return false
}
for i := range atxids {
if atxids[i] == atx {
return true
}
}
return false
}
Loading