Skip to content

Commit

Permalink
Try #5071:
Browse files Browse the repository at this point in the history
  • Loading branch information
bors[bot] authored Sep 25, 2023
2 parents ca91fc5 + 357eebb commit 69d927d
Show file tree
Hide file tree
Showing 28 changed files with 1,140 additions and 306 deletions.
21 changes: 21 additions & 0 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spacemeshos/post/shared"
"golang.org/x/exp/maps"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -40,6 +41,7 @@ type atxChan struct {
// Handler processes the atxs received from all nodes and their validity status.
type Handler struct {
cdb *datastore.CachedDB
cache *cache.Cache
edVerifier *signing.EdVerifier
clock layerClock
publisher pubsub.Publisher
Expand All @@ -58,6 +60,7 @@ type Handler struct {
// NewHandler returns a data handler for ATX.
func NewHandler(
cdb *datastore.CachedDB,
cache *cache.Cache,
edVerifier *signing.EdVerifier,
c layerClock,
pub pubsub.Publisher,
Expand All @@ -72,6 +75,7 @@ func NewHandler(
) *Handler {
return &Handler{
cdb: cdb,
cache: cache,
edVerifier: edVerifier,
clock: c,
publisher: pub,
Expand Down Expand Up @@ -356,6 +360,22 @@ func (h *Handler) ContextuallyValidateAtx(atx *types.VerifiedActivationTx) error
return err
}

func (h *Handler) cacheAtx(ctx context.Context, atx *types.ActivationTxHeader) {
if !h.cache.IsEvicted(atx.TargetEpoch()) {
nonce, err := h.cdb.VRFNonce(atx.NodeID, atx.TargetEpoch())
if err != nil {
h.log.With().Error("failed vrf nonce read", log.Err(err), log.Context(ctx))
return
}
malicious, err := h.cdb.IsMalicious(atx.NodeID)
if err != nil {
h.log.With().Error("failed is malicious read", log.Err(err), log.Context(ctx))
return
}
h.cache.Add(atx.TargetEpoch(), atx.NodeID, atx.ID, cache.ToATXData(atx, nonce, malicious))
}
}

// storeAtx stores an ATX and notifies subscribers of the ATXID.
func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx) error {
malicious, err := h.cdb.IsMalicious(atx.SmesherID)
Expand Down Expand Up @@ -421,6 +441,7 @@ func (h *Handler) storeAtx(ctx context.Context, atx *types.VerifiedActivationTx)
}
h.beacon.OnAtx(header)
h.tortoise.OnAtx(header.ToData())
h.cacheAtx(ctx, header)

// notify subscribers
if ch, found := h.atxChannels[atx.ID()]; found {
Expand Down
3 changes: 2 additions & 1 deletion activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/spacemeshos/go-spacemesh/cache"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
Expand Down Expand Up @@ -101,7 +102,7 @@ func newTestHandler(tb testing.TB, goldenATXID types.ATXID) *testHandler {
mbeacon := NewMockAtxReceiver(ctrl)
mtortoise := mocks.NewMockTortoise(ctrl)

atxHdlr := NewHandler(cdb, verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
atxHdlr := NewHandler(cdb, cache.New(), verifier, mclock, mpub, mockFetch, 1, goldenATXID, mValidator, mbeacon, mtortoise, lg, PoetConfig{})
return &testHandler{
Handler: atxHdlr,

Expand Down
204 changes: 204 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package cache

import (
"bytes"
"sort"
"sync"

"go.uber.org/atomic"

"github.com/spacemeshos/go-spacemesh/common/types"
)

type ATXData struct {
Weight uint64
BaseHeight, Height uint64
Nonce types.VRFPostIndex
Malicious bool
}

type Opt func(*Cache)

// WithCapacity sets the number of epochs from the latest applied
// that cache will maintain in memory.
func WithCapacity(capacity types.EpochID) Opt {
return func(cache *Cache) {
cache.capacity = capacity
}
}

func New(opts ...Opt) *Cache {
cache := &Cache{
capacity: 2,
epochs: map[types.EpochID]epochCache{},
}
for _, opt := range opts {
opt(cache)
}
return cache
}

type Cache struct {
evicted atomic.Uint32

// number of epochs to keep
// capacity is enforced by the cache itself
capacity types.EpochID

mu sync.RWMutex
epochs map[types.EpochID]epochCache
}

type epochCache struct {
atxs map[types.ATXID]*ATXData
identities map[types.NodeID][]types.ATXID
}

func (c *Cache) Evicted() types.EpochID {
return types.EpochID(c.evicted.Load())
}

func (c *Cache) IsEvicted(epoch types.EpochID) bool {
return c.evicted.Load() >= epoch.Uint32()
}

// OnApplied is a notification for cache to evict epochs that are not useful
// to keep in memory.
func (c *Cache) OnApplied(applied types.EpochID) {
if applied < c.capacity {
return
}
evict := applied - c.capacity
if c.IsEvicted(evict) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.evicted.Load() < applied.Uint32() {
c.evicted.Store(evict.Uint32())
}
for epoch := range c.epochs {
if epoch <= evict {
delete(c.epochs, epoch)
atxsCounter.DeleteLabelValues(epoch.String())
identitiesCounter.DeleteLabelValues(epoch.String())
}
}
}

func (c *Cache) Add(epoch types.EpochID, node types.NodeID, atx types.ATXID, data *ATXData) {
if c.IsEvicted(epoch) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
ecache, exists := c.epochs[epoch]
if !exists {
ecache = epochCache{
atxs: map[types.ATXID]*ATXData{},
identities: map[types.NodeID][]types.ATXID{},
}
c.epochs[epoch] = ecache
}
if _, exists := ecache.atxs[atx]; exists {
return
}
ecache.atxs[atx] = data
atxs := ecache.identities[node]

atxsCounter.WithLabelValues(epoch.String()).Add(1)
if atxs == nil {
identitiesCounter.WithLabelValues(epoch.String()).Add(1)
}

atxs = append(atxs, atx)
// NOTE(dshulyak) doesn't make sense, as we don't guarantee that every node
// will see same atxs. it actually should see atmost one and malfeasence proof if node equivocated.
// find out if we have use case in consensus.
sort.Slice(atxs, func(i, j int) bool {
return bytes.Compare(atxs[i].Bytes(), atxs[j].Bytes()) == -1
})
ecache.identities[node] = atxs
}

func (c *Cache) SetMalicious(node types.NodeID) {
c.mu.Lock()
defer c.mu.Unlock()
for _, ecache := range c.epochs {
for _, atx := range ecache.identities[node] {
data := ecache.atxs[atx]
// copy on update instead of copying on read
updated := *data
updated.Malicious = true
ecache.atxs[atx] = &updated
}
}
}

// Get returns atx data.
func (c *Cache) Get(epoch types.EpochID, atx types.ATXID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
data, exists := ecache.atxs[atx]
if !exists {
return nil
}
return data
}

// GetByNode returns atx data of the first atx in lexicographic order.
func (c *Cache) GetByNode(epoch types.EpochID, node types.NodeID) *ATXData {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
atxids, exists := ecache.identities[node]
if !exists {
return nil
}
return ecache.atxs[atxids[0]]
}

// List all known atxs.
func (c *Cache) List(epoch types.EpochID) []types.ATXID {
if c.IsEvicted(epoch) {
return nil
}
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return nil
}
ids := make([]types.ATXID, 0, len(ecache.atxs))
for id := range ecache.atxs {
ids = append(ids, id)
}
return ids
}

// NodeHasAtx returns true if atx was registered with a given node id.
func (c *Cache) NodeHasAtx(epoch types.EpochID, node types.NodeID, atx types.ATXID) bool {
c.mu.RLock()
defer c.mu.RUnlock()
ecache, exists := c.epochs[epoch]
if !exists {
return false
}
atxids, exists := ecache.identities[node]
if !exists {
return false
}
for i := range atxids {
if atxids[i] == atx {
return true
}
}
return false
}
Loading

0 comments on commit 69d927d

Please sign in to comment.