Skip to content

Commit

Permalink
Merge pull request #5382 from spacemeshos/v1.2-avoid-processing-atx-m…
Browse files Browse the repository at this point in the history
…any-times

[backport] avoid processing same ATX in parallel (#5379)
  • Loading branch information
poszu authored Dec 21, 2023
2 parents 0e55663 + ad354f1 commit a115beb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.
* [#5373](https://github.com/spacemeshos/go-spacemesh/pull/5373) automatic scaling of post verifying workers to a lower value (1 by default) when POST proving starts.
The workers are scaled up when POST proving finishes.

* [#5382](https://github.com/spacemeshos/go-spacemesh/pull/5382) avoid processing same (gossiped/fetched) ATX many times in parallel

## Release v1.2.11

### Improvements
Expand Down
42 changes: 40 additions & 2 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ type Handler struct {
mu sync.Mutex
fetcher system.Fetcher
poetCfg PoetConfig

// inProgress map gathers ATXs that are currently being processed.
// It's used to avoid processing the same ATX twice.
inProgress map[types.ATXID][]chan error
inProgressMu sync.Mutex
}

// NewHandler returns a data handler for ATX.
Expand Down Expand Up @@ -79,6 +84,8 @@ func NewHandler(
beacon: beacon,
tortoise: tortoise,
poetCfg: poetCfg,

inProgress: make(map[types.ATXID][]chan error),
}
}

Expand Down Expand Up @@ -458,12 +465,43 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
if err := codec.Decode(msg, &atx); err != nil {
return fmt.Errorf("%w: %w", errMalformedData, err)
}

atx.SetReceived(receivedTime.Local())
if err := atx.Initialize(); err != nil {
return fmt.Errorf("failed to derive ID from atx: %w", err)
}

// Check if processing is already in progress
h.inProgressMu.Lock()
if sub, ok := h.inProgress[atx.ID()]; ok {
ch := make(chan error, 1)
h.inProgress[atx.ID()] = append(sub, ch)
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Debug("atx is already being processed. waiting for result", atx.ID())
select {
case err := <-ch:
h.log.WithContext(ctx).With().Debug("atx processed in other task", atx.ID(), log.Err(err))
return err
case <-ctx.Done():
return ctx.Err()
}
}

h.inProgress[atx.ID()] = []chan error{}
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Info("handling incoming atx", atx.ID(), log.Int("size", len(msg)))

err := h.processAtx(ctx, expHash, peer, atx)
h.inProgressMu.Lock()
defer h.inProgressMu.Unlock()
for _, ch := range h.inProgress[atx.ID()] {
ch <- err
close(ch)
}
delete(h.inProgress, atx.ID())
return err
}

func (h *Handler) processAtx(ctx context.Context, expHash types.Hash32, peer p2p.Peer, atx types.ActivationTx) error {
if !h.edVerifier.Verify(signing.ATX, atx.SmesherID, atx.SignedBytes(), atx.Signature) {
return fmt.Errorf("failed to verify atx signature: %w", errMalformedData)
}
Expand Down Expand Up @@ -495,7 +533,7 @@ func (h *Handler) handleAtx(ctx context.Context, expHash types.Hash32, peer p2p.
return fmt.Errorf("cannot process atx %v: %w", atx.ShortString(), err)
}
events.ReportNewActivation(vAtx)
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx), log.Int("size", len(msg)))
h.log.WithContext(ctx).With().Info("new atx", log.Inline(vAtx))
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions activation/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
Expand Down Expand Up @@ -906,6 +907,74 @@ func TestHandler_HandleGossipAtx(t *testing.T) {
require.NoError(t, atxHdlr.HandleGossipAtx(context.Background(), "", secondData))
}

func TestHandler_HandleParallelGossipAtx(t *testing.T) {
goldenATXID := types.ATXID{2, 3, 4}
atxHdlr := newTestHandler(t, goldenATXID)

sig, err := signing.NewEdSigner()
require.NoError(t, err)
nodeID := sig.NodeID()
nipost := newNIPostWithChallenge(t, types.HexToHash32("0x3333"), []byte{0xba, 0xbe})
vrfNonce := types.VRFPostIndex(12345)
atx := &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
PublishEpoch: 1,
PrevATXID: types.EmptyATXID,
PositioningATX: goldenATXID,
CommitmentATX: &goldenATXID,
InitialPost: nipost.Post,
},
Coinbase: types.Address{2, 3, 4},
NumUnits: 2,
NIPost: nipost,
NodeID: &nodeID,
VRFNonce: &vrfNonce,
},
SmesherID: nodeID,
}
atx.Signature = sig.Sign(signing.ATX, atx.SignedBytes())
atx.SetEffectiveNumUnits(atx.NumUnits)
atx.SetReceived(time.Now())
_, err = atx.Verify(0, 2)
require.NoError(t, err)

atxData, err := codec.Encode(atx)
require.NoError(t, err)

atxHdlr.mclock.EXPECT().CurrentLayer().Return(atx.PublishEpoch.FirstLayer())
atxHdlr.mValidator.EXPECT().VRFNonce(nodeID, goldenATXID, &vrfNonce, gomock.Any(), atx.NumUnits)
atxHdlr.mValidator.EXPECT().Post(
gomock.Any(),
atx.SmesherID,
goldenATXID,
atx.InitialPost,
gomock.Any(),
atx.NumUnits,
).DoAndReturn(
func(_ context.Context, _ types.NodeID, _ types.ATXID, _ *types.Post, _ *types.PostMetadata, _ uint32) error {
time.Sleep(100 * time.Millisecond)
return nil
},
)
atxHdlr.mockFetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
atxHdlr.mockFetch.EXPECT().GetPoetProof(gomock.Any(), atx.GetPoetProofRef())
atxHdlr.mValidator.EXPECT().InitialNIPostChallenge(&atx.NIPostChallenge, gomock.Any(), goldenATXID)
atxHdlr.mValidator.EXPECT().PositioningAtx(goldenATXID, gomock.Any(), goldenATXID, atx.PublishEpoch)
atxHdlr.mValidator.EXPECT().NIPost(gomock.Any(), nodeID, goldenATXID, atx.NIPost, gomock.Any(), atx.NumUnits)
atxHdlr.mbeacon.EXPECT().OnAtx(gomock.Any())
atxHdlr.mtortoise.EXPECT().OnAtx(gomock.Any())

var eg errgroup.Group
for i := 0; i < 10; i++ {
eg.Go(func() error {
return atxHdlr.HandleGossipAtx(context.Background(), "", atxData)
})
}

require.NoError(t, eg.Wait())
}

func TestHandler_HandleSyncedAtx(t *testing.T) {
// Arrange
goldenATXID := types.ATXID{2, 3, 4}
Expand Down

0 comments on commit a115beb

Please sign in to comment.