Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Fix/6041 bug #6053

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 56 additions & 25 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ var (

// PoetConfig is the configuration to interact with the poet server.
type PoetConfig struct {
PhaseShift time.Duration `mapstructure:"phase-shift"`
CycleGap time.Duration `mapstructure:"cycle-gap"`
GracePeriod time.Duration `mapstructure:"grace-period"`
RequestTimeout time.Duration `mapstructure:"poet-request-timeout"`
RequestRetryDelay time.Duration `mapstructure:"retry-delay"`
MaxRequestRetries int `mapstructure:"retry-max"`
PhaseShift time.Duration `mapstructure:"phase-shift"`
CycleGap time.Duration `mapstructure:"cycle-gap"`
GracePeriod time.Duration `mapstructure:"grace-period"`
RequestTimeout time.Duration `mapstructure:"poet-request-timeout"`
RequestRetryDelay time.Duration `mapstructure:"retry-delay"`
PositioningATXSelectionTimeout time.Duration `mapstructure:"positioning-atx-selection-timeout"`
MaxRequestRetries int `mapstructure:"retry-max"`
}

func DefaultPoetConfig() PoetConfig {
Expand Down Expand Up @@ -203,6 +204,7 @@ func NewBuilder(
for _, opt := range opts {
opt(b)
}

return b
}

Expand Down Expand Up @@ -563,6 +565,15 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
}
}

poetStartsAt := b.poetRoundStart(current)
if b.poetCfg.PositioningATXSelectionTimeout > 0 {
var cancel context.CancelFunc

deadline := poetStartsAt.Add(-b.poetCfg.GracePeriod).Add(b.poetCfg.PositioningATXSelectionTimeout)
ctx, cancel = context.WithDeadline(ctx, deadline)
defer cancel()
}

prevAtx, err = b.GetPrevAtx(nodeID)
switch {
case errors.Is(err, sql.ErrNotFound):
Expand All @@ -585,6 +596,7 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
}
return nil, fmt.Errorf("initial POST is invalid: %w", err)
}

posAtx, err := b.getPositioningAtx(ctx, nodeID, publish, nil)
if err != nil {
return nil, fmt.Errorf("failed to get positioning ATX: %w", err)
Expand All @@ -604,7 +616,6 @@ func (b *Builder) BuildNIPostChallenge(ctx context.Context, nodeID types.NodeID)
case err != nil:
return nil, fmt.Errorf("get last ATX: %w", err)
default:
// regular ATX challenge
posAtx, err := b.getPositioningAtx(ctx, nodeID, publish, prevAtx)
if err != nil {
return nil, fmt.Errorf("failed to get positioning ATX: %w", err)
Expand Down Expand Up @@ -849,10 +860,13 @@ func (b *Builder) searchPositioningAtx(
ctx context.Context,
nodeID types.NodeID,
publish types.EpochID,
previous *types.ActivationTx,
) (types.ATXID, error) {
logger := b.logger.With(log.ZShortStringer("smesherID", nodeID), zap.Uint32("publish epoch", publish.Uint32()))

b.posAtxFinder.finding.Lock()
defer b.posAtxFinder.finding.Unlock()

if found := b.posAtxFinder.found; found != nil && found.forPublish == publish {
logger.Debug("using cached positioning atx", log.ZShortStringer("atx_id", found.id))
return found.id, nil
Expand All @@ -862,7 +876,9 @@ func (b *Builder) searchPositioningAtx(
if err != nil {
return types.EmptyATXID, fmt.Errorf("get latest epoch: %w", err)
}

logger.Info("searching for positioning atx", zap.Uint32("latest_epoch", latestPublished.Uint32()))

// positioning ATX publish epoch must be lower than the publish epoch of built ATX
positioningAtxPublished := min(latestPublished, publish-1)
id, err := findFullyValidHighTickAtx(
Expand All @@ -875,11 +891,18 @@ func (b *Builder) searchPositioningAtx(
VerifyChainOpts.AssumeValidBefore(time.Now().Add(-b.postValidityDelay)),
VerifyChainOpts.WithTrustedID(nodeID),
VerifyChainOpts.WithLogger(b.logger),
VerifyChainOpts.PrioritizeCall(),
)
if err != nil {
logger.Info("search failed - using golden atx as positioning atx", zap.Error(err))
id = b.conf.GoldenATXID
if previous != nil {
acud marked this conversation as resolved.
Show resolved Hide resolved
id = previous.ID()
poszu marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("search failed - using previous atx as positioning atx", zap.Error(err))
} else {
id = b.conf.GoldenATXID
logger.Info("search failed - using golden atx as positioning atx", zap.Error(err))
}
}

b.posAtxFinder.found = &struct {
id types.ATXID
forPublish types.EpochID
Expand All @@ -897,20 +920,15 @@ func (b *Builder) getPositioningAtx(
publish types.EpochID,
previous *types.ActivationTx,
) (types.ATXID, error) {
id, err := b.searchPositioningAtx(ctx, nodeID, publish)
id, err := b.searchPositioningAtx(ctx, nodeID, publish, previous)
if err != nil {
return types.EmptyATXID, err
}

if previous != nil {
switch {
case id == b.conf.GoldenATXID:
id = previous.ID()
case id != b.conf.GoldenATXID:
if candidate, err := atxs.Get(b.db, id); err == nil {
if previous.TickHeight() >= candidate.TickHeight() {
id = previous.ID()
}
if previous != nil && id != previous.ID() {
if candidate, err := atxs.Get(b.db, id); err == nil {
ConvallariaMaj marked this conversation as resolved.
Show resolved Hide resolved
if previous.TickHeight() >= candidate.TickHeight() {
id = previous.ID()
}
}
}
Expand Down Expand Up @@ -942,8 +960,7 @@ func (b *Builder) Regossip(ctx context.Context, nodeID types.NodeID) error {
}

func buildNipostChallengeStartDeadline(roundStart time.Time, gracePeriod time.Duration) time.Time {
jitter := randomDurationInRange(time.Duration(0), gracePeriod*maxNipostChallengeBuildJitter/100.0)
return roundStart.Add(jitter).Add(-gracePeriod)
return roundStart.Add(-gracePeriod)
}

func (b *Builder) version(publish types.EpochID) types.AtxVersion {
Expand All @@ -966,8 +983,17 @@ func findFullyValidHighTickAtx(
opts ...VerifyChainOption,
) (types.ATXID, error) {
var found *types.ATXID
atxdata.IterateHighTicksInEpoch(publish+1, func(id types.ATXID) bool {

// iterate trough epochs, to get first valid, not malicious ATX with the biggest height
atxdata.IterateHighTicksInEpoch(publish+1, func(id types.ATXID) (contSearch bool) {
logger.Info("found candidate for high-tick atx", log.ZShortStringer("id", id))

if ctx.Err() != nil {
return false
}

// verify ATX-candidate by getting their dependencies (previous Atx, positioning ATX etc.)
// and verifying PoST for every dependency
if err := validator.VerifyChain(ctx, id, goldenATXID, opts...); err != nil {
logger.Info("rejecting candidate for high-tick atx", zap.Error(err), log.ZShortStringer("id", id))
return true
Expand All @@ -976,8 +1002,13 @@ func findFullyValidHighTickAtx(
return false
})

if found != nil {
return *found, nil
if ctx.Err() != nil {
return types.ATXID{}, ctx.Err()
}
return types.ATXID{}, ErrNotFound

if found == nil {
return types.ATXID{}, ErrNotFound
}

return *found, nil
}
39 changes: 38 additions & 1 deletion activation/activation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,7 +1456,7 @@ func TestGetPositioningAtx(t *testing.T) {
prev.SetID(types.RandomATXID())

tab.mValidator.EXPECT().VerifyChain(gomock.Any(), atxInDb.ID(), tab.goldenATXID, gomock.Any())
found, err := tab.searchPositioningAtx(context.Background(), types.EmptyNodeID, 99)
found, err := tab.searchPositioningAtx(context.Background(), types.EmptyNodeID, 99, prev)
require.NoError(t, err)
require.Equal(t, atxInDb.ID(), found)

Expand All @@ -1471,6 +1471,43 @@ func TestGetPositioningAtx(t *testing.T) {
require.NoError(t, err)
require.Equal(t, prev.ID(), selected)
})
t.Run("prefers own previous or golded when positioning ATX selection timout expired", func(t *testing.T) {
tab := newTestBuilder(t, 1)

atxInDb := &types.ActivationTx{TickCount: 100}
atxInDb.SetID(types.RandomATXID())
require.NoError(t, atxs.Add(tab.db, atxInDb))
tab.atxsdata.AddFromAtx(atxInDb, false)

prev := &types.ActivationTx{TickCount: 90}
prev.SetID(types.RandomATXID())

// no timeout set up
tab.mValidator.EXPECT().VerifyChain(gomock.Any(), atxInDb.ID(), tab.goldenATXID, gomock.Any())
found, err := tab.getPositioningAtx(context.Background(), types.EmptyNodeID, 99, prev)
require.NoError(t, err)
require.Equal(t, atxInDb.ID(), found)

tab.posAtxFinder.found = nil

// timeout set up, prev ATX exists
ctx, cancel := context.WithCancel(context.Background())
cancel()

selected, err := tab.getPositioningAtx(ctx, types.EmptyNodeID, 99, prev)
require.NoError(t, err)
require.Equal(t, prev.ID(), selected)

tab.posAtxFinder.found = nil

// timeout set up, prev ATX do not exists
ctx, cancel = context.WithCancel(context.Background())
cancel()

selected, err = tab.getPositioningAtx(ctx, types.EmptyNodeID, 99, nil)
require.NoError(t, err)
require.Equal(t, tab.goldenATXID, selected)
})
}

func TestFindFullyValidHighTickAtx(t *testing.T) {
Expand Down
29 changes: 28 additions & 1 deletion activation/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,40 @@ type AtxReceiver interface {

type PostVerifier interface {
io.Closer
Verify(ctx context.Context, p *shared.Proof, m *shared.ProofMetadata, opts ...verifying.OptionFunc) error
Verify(ctx context.Context, p *shared.Proof, m *shared.ProofMetadata, opts ...postVerifierOptionFunc) error
}

type scaler interface {
scale(int)
}

type postVerifierCallOption struct {
prioritised bool
verifierOptions []verifying.OptionFunc
}

type postVerifierOptionFunc func(*postVerifierCallOption)

func applyOptions(options ...postVerifierOptionFunc) postVerifierCallOption {
opts := postVerifierCallOption{}
for _, opt := range options {
opt(&opts)
}
return opts
}

func PrioritisedCall() postVerifierOptionFunc {
return func(o *postVerifierCallOption) {
o.prioritised = true
}
}

func WithVerifierOptions(ops ...verifying.OptionFunc) postVerifierOptionFunc {
return func(o *postVerifierCallOption) {
o.verifierOptions = ops
}
}

// validatorOption is a functional option type for the validator.
type validatorOption func(*validatorOptions)

Expand Down
2 changes: 1 addition & 1 deletion activation/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (mh *InvalidPostIndexHandler) Validate(ctx context.Context, data wire.Proof
ctx,
post,
meta,
verifying.SelectedIndex(int(proof.InvalidIdx)),
WithVerifierOptions(verifying.SelectedIndex(int(proof.InvalidIdx))),
); err != nil {
return atx.SmesherID, nil
}
Expand Down
38 changes: 23 additions & 15 deletions activation/post_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,12 @@ func (v *postVerifier) Verify(
_ context.Context,
p *shared.Proof,
m *shared.ProofMetadata,
opts ...verifying.OptionFunc,
opts ...postVerifierOptionFunc,
) error {
opt := applyOptions(opts...)

v.logger.Debug("verifying post", zap.Stringer("proof_node_id", types.BytesToNodeID(m.NodeId)))
return v.ProofVerifier.Verify(p, m, v.cfg, v.logger, opts...)
return v.ProofVerifier.Verify(p, m, v.cfg, v.logger, opt.verifierOptions...)
}

type postVerifierOpts struct {
Expand Down Expand Up @@ -296,31 +298,37 @@ func (v *offloadingPostVerifier) scale(target int) {
}
}

// Verify creates a Job from given parameters, adds to jobs queue (prioritized or not)
// and waits for result of Job execution.
func (v *offloadingPostVerifier) Verify(
ctx context.Context,
p *shared.Proof,
m *shared.ProofMetadata,
opts ...verifying.OptionFunc,
opts ...postVerifierOptionFunc,
) error {
opt := applyOptions(opts...)

job := &verifyPostJob{
ctx: ctx,
proof: p,
metadata: m,
opts: opts,
opts: opt.verifierOptions,
result: make(chan error, 1),
}

metrics.PostVerificationQueue.Inc()
defer metrics.PostVerificationQueue.Dec()

var jobChannel chan<- *verifyPostJob
_, prioritize := v.prioritizedIds[types.BytesToNodeID(m.NodeId)]
switch {
case prioritize:
v.log.Debug("prioritizing post verification", zap.Stringer("proof_node_id", types.BytesToNodeID(m.NodeId)))
jobChannel := v.jobs
if opt.prioritised {
v.log.Debug("prioritizing post verification call")
jobChannel = v.prioritized
default:
jobChannel = v.jobs
} else {
nodeID := types.BytesToNodeID(m.NodeId)
if _, prioritized := v.prioritizedIds[nodeID]; prioritized {
v.log.Debug("prioritizing post verification by Node ID", zap.Stringer("proof_node_id", nodeID))
jobChannel = v.prioritized
}
}

select {
Expand Down Expand Up @@ -365,17 +373,17 @@ func (w *postVerifierWorker) start() {
// First try to process a prioritized job.
select {
case job := <-w.prioritized:
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, job.opts...)
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, WithVerifierOptions(job.opts...))
ConvallariaMaj marked this conversation as resolved.
Show resolved Hide resolved
default:
select {
case <-w.shutdown:
return
case <-w.stop:
return
case job := <-w.prioritized:
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, job.opts...)
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, WithVerifierOptions(job.opts...))
case job := <-w.jobs:
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, job.opts...)
job.result <- w.verifier.Verify(job.ctx, job.proof, job.metadata, WithVerifierOptions(job.opts...))
}
}
}
Expand All @@ -387,7 +395,7 @@ func (v *noopPostVerifier) Verify(
_ context.Context,
_ *shared.Proof,
_ *shared.ProofMetadata,
_ ...verifying.OptionFunc,
_ ...postVerifierOptionFunc,
) error {
return nil
}
Expand Down
9 changes: 9 additions & 0 deletions activation/post_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ func TestPostVerifierPrioritization(t *testing.T) {
err := v.Verify(context.Background(), &shared.Proof{}, &shared.ProofMetadata{NodeId: nodeID.Bytes()})
require.NoError(t, err)

verifier.EXPECT().
Verify(context.WithValue(context.Background(), prioritizedVerifyCall, true),
gomock.Any(), &shared.ProofMetadata{NodeId: nodeID.Bytes()}, gomock.Any()).
Return(nil)

err = v.Verify(context.WithValue(context.Background(), prioritizedVerifyCall, true),
&shared.Proof{}, &shared.ProofMetadata{NodeId: nodeID.Bytes()})
require.NoError(t, err)

verifier.EXPECT().Close().Return(nil)
require.NoError(t, v.Close())
}
Expand Down
Loading
Loading