Skip to content

Commit

Permalink
Merge branch 'develop' into consensus-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
dshulyak authored Oct 23, 2023
2 parents 15644cf + 88f37a7 commit 6687f4b
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 212 deletions.
120 changes: 64 additions & 56 deletions blocks/certifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"sync/atomic"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
Expand Down Expand Up @@ -83,8 +84,7 @@ type Certifier struct {

db *datastore.CachedDB
oracle hare.Rolacle
nodeID types.NodeID
signer *signing.EdSigner
signers map[types.NodeID]*signing.EdSigner
edVerifier *signing.EdVerifier
publisher pubsub.Publisher
layerClock layerClock
Expand All @@ -102,8 +102,7 @@ type Certifier struct {
func NewCertifier(
db *datastore.CachedDB,
o hare.Rolacle,
n types.NodeID,
s *signing.EdSigner,

v *signing.EdVerifier,
p pubsub.Publisher,
lc layerClock,
Expand All @@ -116,8 +115,7 @@ func NewCertifier(
cfg: defaultCertConfig(),
db: db,
oracle: o,
nodeID: n,
signer: s,
signers: make(map[types.NodeID]*signing.EdSigner),
edVerifier: v,
publisher: p,
layerClock: lc,
Expand All @@ -134,6 +132,18 @@ func NewCertifier(
return c
}

func (c *Certifier) Register(s *signing.EdSigner) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.signers[s.NodeID()]; exists {
c.logger.With().Error("signing key already registered", log.ShortStringer("id", s.NodeID()))
return
}

c.logger.With().Info("registered signing key", log.ShortStringer("id", s.NodeID()))
c.signers[s.NodeID()] = s
}

// Start starts the background goroutine for periodic pruning.
func (c *Certifier) Start(ctx context.Context) {
c.once.Do(func() {
Expand Down Expand Up @@ -185,7 +195,7 @@ func (c *Certifier) prune() {
}
}

func (c *Certifier) createIfNeeded(lid types.LayerID, bid types.BlockID) {
func (c *Certifier) createIfNeeded(lid types.LayerID, bid types.BlockID) *certInfo {
if _, ok := c.certifyMsgs[lid]; !ok {
c.certifyMsgs[lid] = make(map[types.BlockID]*certInfo)
}
Expand All @@ -194,6 +204,7 @@ func (c *Certifier) createIfNeeded(lid types.LayerID, bid types.BlockID) {
signatures: make([]types.CertifyMessage, 0, c.cfg.CommitteeSize),
}
}
return c.certifyMsgs[lid][bid]
}

// RegisterForCert register to generate a certificate for the specified layer/block.
Expand All @@ -203,52 +214,61 @@ func (c *Certifier) RegisterForCert(ctx context.Context, lid types.LayerID, bid

c.mu.Lock()
defer c.mu.Unlock()
c.createIfNeeded(lid, bid)
c.certifyMsgs[lid][bid].registered = true
return c.tryGenCert(ctx, logger, lid, bid)
info := c.createIfNeeded(lid, bid)
info.registered = true
return c.tryGenCert(ctx, logger, lid, bid, info)
}

// CertifyIfEligible signs the hare output, along with its role proof as a certifier, and gossip the CertifyMessage
// if the node is eligible to be a certifier.
func (c *Certifier) CertifyIfEligible(ctx context.Context, logger log.Log, lid types.LayerID, bid types.BlockID) error {
if _, err := c.beacon.GetBeacon(lid.GetEpoch()); err != nil {
func (c *Certifier) CertifyIfEligible(ctx context.Context, lid types.LayerID, bid types.BlockID) error {
beacon, err := c.beacon.GetBeacon(lid.GetEpoch())
if err != nil {
return errBeaconNotAvailable
}
// check if the node is eligible to certify the hare output
proof, err := c.oracle.Proof(ctx, lid, eligibility.CertifyRound)
if err != nil {
logger.With().Error("failed to get eligibility proof to certify", log.Err(err))
return err

c.mu.Lock()
signers := maps.Values(c.signers)
c.mu.Unlock()

var errs error
for _, s := range signers {
if err := c.certifySingleSigner(ctx, s, lid, bid, beacon); err != nil {
errs = errors.Join(errs, fmt.Errorf("certifying block %v/%v by %s: %w", lid, bid, s.NodeID().ShortString(), err))
}
}
return errs
}

eligibilityCount, err := c.oracle.CalcEligibility(ctx, lid, eligibility.CertifyRound, c.cfg.CommitteeSize, c.nodeID, proof)
func (c *Certifier) certifySingleSigner(ctx context.Context, s *signing.EdSigner, lid types.LayerID, bid types.BlockID, beacon types.Beacon) error {
proof := eligibility.GenVRF(context.Background(), s.VRFSigner(), beacon, lid, eligibility.CertifyRound)
eligibilityCount, err := c.oracle.CalcEligibility(ctx, lid, eligibility.CertifyRound, c.cfg.CommitteeSize, s.NodeID(), proof)
if err != nil {
return err
return fmt.Errorf("calculating eligibility: %w", err)
}
if eligibilityCount == 0 { // not eligible
return nil
}

msg := types.CertifyMessage{
msg := newCertifyMsg(s, lid, bid, proof, eligibilityCount)
if err = c.publisher.Publish(ctx, pubsub.BlockCertify, codec.MustEncode(msg)); err != nil {
return fmt.Errorf("publishing block certification message: %w", err)
}
return nil
}

func newCertifyMsg(s *signing.EdSigner, lid types.LayerID, bid types.BlockID, proof types.VrfSignature, eligibility uint16) *types.CertifyMessage {
msg := &types.CertifyMessage{
CertifyContent: types.CertifyContent{
LayerID: lid,
BlockID: bid,
EligibilityCnt: eligibilityCount,
EligibilityCnt: eligibility,
Proof: proof,
},
SmesherID: c.nodeID,
SmesherID: s.NodeID(),
}
msg.Signature = c.signer.Sign(signing.HARE, msg.Bytes())
data, err := codec.Encode(&msg)
if err != nil {
logger.With().Panic("failed to serialize certify message", log.Err(err))
return err
}
if err = c.publisher.Publish(ctx, pubsub.BlockCertify, data); err != nil {
logger.With().Error("failed to send certify message", log.Err(err))
return err
}
return nil
msg.Signature = s.Sign(signing.HARE, msg.Bytes())
return msg
}

// NumCached returns the number of layers being cached in memory.
Expand Down Expand Up @@ -384,35 +404,27 @@ func (c *Certifier) saveMessage(ctx context.Context, logger log.Log, msg types.C

lid := msg.LayerID
bid := msg.BlockID
c.createIfNeeded(lid, bid)
info := c.createIfNeeded(lid, bid)

c.certifyMsgs[lid][bid].signatures = append(c.certifyMsgs[lid][bid].signatures, msg)
c.certifyMsgs[lid][bid].totalEligibility += msg.EligibilityCnt
info.signatures = append(info.signatures, msg)
info.totalEligibility += msg.EligibilityCnt
logger.With().Debug("saved certify msg",
log.Uint16("eligibility_count", c.certifyMsgs[lid][bid].totalEligibility),
log.Int("num_msg", len(c.certifyMsgs[lid][bid].signatures)),
log.Uint16("eligibility_count", info.totalEligibility),
log.Int("num_msg", len(info.signatures)),
)

if c.certifyMsgs[lid][bid].registered {
return c.tryGenCert(ctx, logger, lid, bid)
if info.registered {
return c.tryGenCert(ctx, logger, lid, bid, info)
}
return nil
}

func (c *Certifier) tryGenCert(ctx context.Context, logger log.Log, lid types.LayerID, bid types.BlockID) error {
if _, ok := c.certifyMsgs[lid]; !ok {
logger.Fatal("missing layer in cache")
}
if _, ok := c.certifyMsgs[lid][bid]; !ok {
logger.Fatal("missing block in cache")
}

if c.certifyMsgs[lid][bid].done ||
c.certifyMsgs[lid][bid].totalEligibility < uint16(c.cfg.CertifyThreshold) {
func (c *Certifier) tryGenCert(ctx context.Context, logger log.Log, lid types.LayerID, bid types.BlockID, info *certInfo) error {
if info.done || info.totalEligibility < uint16(c.cfg.CertifyThreshold) {
return nil
}

if !c.certifyMsgs[lid][bid].registered {
if !info.registered {
// do not try to generate a certificate for this block.
// wait for syncer to download from peers
return nil
Expand Down Expand Up @@ -487,11 +499,7 @@ func (c *Certifier) addCertCount(epoch types.EpochID) {
func (c *Certifier) CertCount() map[types.EpochID]int {
c.mu.Lock()
defer c.mu.Unlock()
result := map[types.EpochID]int{}
for epoch, count := range c.certCount {
result[epoch] = count
}
return result
return maps.Clone(c.certCount)
}

func (c *Certifier) save(ctx context.Context, lid types.LayerID, cert *types.Certificate, valid, invalid []types.BlockID) error {
Expand Down
Loading

0 comments on commit 6687f4b

Please sign in to comment.