diff --git a/lib/babe/babe.go b/lib/babe/babe.go index 3410029c37..7916c716b0 100644 --- a/lib/babe/babe.go +++ b/lib/babe/babe.go @@ -420,7 +420,9 @@ func (b *Service) handleEpoch(epoch uint64) (next uint64, err error) { case err := <-errCh: // TODO: errEpochPast is sent on this channel, but it doesnot get logged here epochTimer.Stop() - logger.Errorf("error from epochHandler: %s", err) + if err != nil { + logger.Errorf("error from epochHandler: %s", err) + } } // setup next epoch, re-invoke block authoring diff --git a/lib/babe/epoch_handler.go b/lib/babe/epoch_handler.go index 0bb6dc657e..52dc8c9132 100644 --- a/lib/babe/epoch_handler.go +++ b/lib/babe/epoch_handler.go @@ -7,12 +7,9 @@ import ( "context" "errors" "fmt" - "sort" - "time" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/crypto/sr25519" - "golang.org/x/exp/maps" ) type handleSlotFunc = func(epoch uint64, slot Slot, authorityIndex uint32, @@ -23,6 +20,7 @@ var ( ) type epochHandler struct { + slotHandler slotHandler epochNumber uint64 firstSlot uint64 @@ -53,6 +51,7 @@ func newEpochHandler(epochNumber, firstSlot uint64, epochData *epochData, consta } return &epochHandler{ + slotHandler: newSlotHandler(constants.slotDuration), epochNumber: epochNumber, firstSlot: firstSlot, constants: constants, @@ -62,7 +61,10 @@ func newEpochHandler(epochNumber, firstSlot uint64, epochData *epochData, consta }, nil } +// run executes the block production for each available successfully claimed slot +// it is important to note that any error will be transmitted through errCh func (h *epochHandler) run(ctx context.Context, errCh chan<- error) { + defer close(errCh) currSlot := getCurrentSlot(h.constants.slotDuration) // if currSlot < h.firstSlot, it means we're at genesis and waiting for the first slot to arrive. @@ -75,83 +77,24 @@ func (h *epochHandler) run(ctx context.Context, errCh chan<- error) { return } - // for each slot we're handling, create a timer that will fire when it starts - // we create timers only for slots where we're authoring - authoringSlots := getAuthoringSlots(h.slotToPreRuntimeDigest) + logger.Debugf("authoring in %d slots in epoch %d", len(h.slotToPreRuntimeDigest), h.epochNumber) - type slotWithTimer struct { - startTime time.Time - timer *time.Timer - slotNum uint64 - } + for { + currentSlot, err := h.slotHandler.waitForNextSlot(ctx) + if err != nil { + errCh <- err + return + } - slotTimeTimers := make([]*slotWithTimer, 0, len(authoringSlots)) - for _, authoringSlot := range authoringSlots { - if authoringSlot < currSlot { - // ignore slots already passed + // check if the slot is an authoring slot otherwise wait for the next slot + preRuntimeDigest, has := h.slotToPreRuntimeDigest[currentSlot.number] + if !has { continue } - startTime := getSlotStartTime(authoringSlot, h.constants.slotDuration) - waitTime := time.Until(startTime) - timer := time.NewTimer(waitTime) - - slotTimeTimers = append(slotTimeTimers, &slotWithTimer{ - timer: timer, - slotNum: authoringSlot, - startTime: startTime, - }) - - logger.Debugf("start time of slot %d: %v", authoringSlot, startTime) - } - - logger.Debugf("authoring in %d slots in epoch %d", len(slotTimeTimers), h.epochNumber) - - for _, swt := range slotTimeTimers { - logger.Debugf("waiting for next authoring slot %d", swt.slotNum) - - select { - case <-ctx.Done(): - for _, swt := range slotTimeTimers { - swt.timer.Stop() - } - return - case <-swt.timer.C: - // we must do a time correction as the slot timer sometimes is triggered - // before the time defined in the constructor due to an inconsistency - // of the language -> https://github.com/golang/go/issues/17696 - - diff := time.Since(swt.startTime) - if diff < 0 { - time.Sleep(-diff) - } - - if _, has := h.slotToPreRuntimeDigest[swt.slotNum]; !has { - // this should never happen - panic(fmt.Sprintf("no VRF proof for authoring slot! slot=%d", swt.slotNum)) - } - - currentSlot := Slot{ - start: swt.startTime, - duration: h.constants.slotDuration, - number: swt.slotNum, - } - err := h.handleSlot(h.epochNumber, currentSlot, h.epochData.authorityIndex, h.slotToPreRuntimeDigest[swt.slotNum]) - if err != nil { - logger.Warnf("failed to handle slot %d: %s", swt.slotNum, err) - continue - } + err = h.handleSlot(h.epochNumber, currentSlot, h.epochData.authorityIndex, preRuntimeDigest) + if err != nil { + logger.Warnf("failed to handle slot %d: %s", currentSlot.number, err) } } } - -// getAuthoringSlots returns an ordered slice of slot numbers where we can author blocks, -// based on the given VRF output and proof map. -func getAuthoringSlots(slotToPreRuntimeDigest map[uint64]*types.PreRuntimeDigest) []uint64 { - authoringSlots := maps.Keys(slotToPreRuntimeDigest) - sort.Slice(authoringSlots, func(i, j int) bool { - return authoringSlots[i] < authoringSlots[j] - }) - - return authoringSlots -} diff --git a/lib/babe/epoch_handler_integration_test.go b/lib/babe/epoch_handler_integration_test.go new file mode 100644 index 0000000000..bb8eac33a5 --- /dev/null +++ b/lib/babe/epoch_handler_integration_test.go @@ -0,0 +1,118 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +//go:build integration + +package babe + +import ( + "context" + "testing" + "time" + + "github.com/ChainSafe/gossamer/dot/types" + "github.com/ChainSafe/gossamer/lib/crypto/sr25519" + "github.com/ChainSafe/gossamer/pkg/scale" + "github.com/stretchr/testify/require" +) + +func TestEpochHandler_run_shouldReturnAfterContextCancel(t *testing.T) { + t.Parallel() + + const authorityIndex uint32 = 0 + aliceKeyPair := keyring.Alice().(*sr25519.Keypair) + epochData := &epochData{ + threshold: scale.MaxUint128, + authorityIndex: authorityIndex, + authorities: []types.Authority{ + *types.NewAuthority(aliceKeyPair.Public(), 1), + }, + } + + const slotDuration = 6 * time.Second + const epochLength uint64 = 100 + + testConstants := constants{ + slotDuration: slotDuration, + epochLength: epochLength, + } + + const expectedEpoch = 1 + startSlot := getCurrentSlot(slotDuration) + handler := testHandleSlotFunc(t, authorityIndex, expectedEpoch, startSlot) + + epochHandler, err := newEpochHandler(1, startSlot, epochData, testConstants, handler, aliceKeyPair) + require.NoError(t, err) + require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest))) + + timeoutCtx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(7 * time.Second) + cancel() + }() + + errCh := make(chan error) + go epochHandler.run(timeoutCtx, errCh) + + err = <-errCh + require.ErrorIs(t, err, context.Canceled) +} + +func TestEpochHandler_run(t *testing.T) { + t.Parallel() + + const authorityIndex uint32 = 0 + aliceKeyPair := keyring.Alice().(*sr25519.Keypair) + epochData := &epochData{ + threshold: scale.MaxUint128, + authorityIndex: authorityIndex, + authorities: []types.Authority{ + *types.NewAuthority(aliceKeyPair.Public(), 1), + }, + } + + const slotDuration = 6 * time.Second + const epochLength uint64 = 100 + + testConstants := constants{ + slotDuration: slotDuration, + epochLength: epochLength, + } + + const expectedEpoch = 1 + startSlot := getCurrentSlot(slotDuration) + handler := testHandleSlotFunc(t, authorityIndex, expectedEpoch, startSlot) + + epochHandler, err := newEpochHandler(1, startSlot, epochData, testConstants, handler, aliceKeyPair) + require.NoError(t, err) + require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest))) + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 10*slotDuration) + defer cancel() + + errCh := make(chan error) + go epochHandler.run(timeoutCtx, errCh) + + err = <-errCh + require.ErrorIs(t, err, context.DeadlineExceeded) + +} + +func testHandleSlotFunc(t *testing.T, expectedAuthorityIndex uint32, + expectedEpoch, startSlot uint64) handleSlotFunc { + currentSlot := startSlot + + return func(epoch uint64, slot Slot, authorityIndex uint32, + preRuntimeDigest *types.PreRuntimeDigest) error { + require.NotNil(t, preRuntimeDigest) + require.Equal(t, expectedEpoch, epoch) + require.Equal(t, expectedAuthorityIndex, authorityIndex) + + require.GreaterOrEqual(t, slot.number, currentSlot) + + // increase the slot by one so we expect the next call + // to be exactly 1 slot greater than the previous call + currentSlot++ + return nil + } +} diff --git a/lib/babe/epoch_handler_test.go b/lib/babe/epoch_handler_test.go index 8918bea666..11d62cf112 100644 --- a/lib/babe/epoch_handler_test.go +++ b/lib/babe/epoch_handler_test.go @@ -4,7 +4,6 @@ package babe import ( - "context" "testing" "time" @@ -29,70 +28,19 @@ func TestNewEpochHandler(t *testing.T) { sd, err := time.ParseDuration("6s") require.NoError(t, err) - constants := constants{ //nolint:govet + testConstants := constants{ slotDuration: sd, epochLength: 200, } keypair := keyring.Alice().(*sr25519.Keypair) - epochHandler, err := newEpochHandler(1, 9999, epochData, constants, testHandleSlotFunc, keypair) + epochHandler, err := newEpochHandler(1, 9999, epochData, testConstants, testHandleSlotFunc, keypair) require.NoError(t, err) require.Equal(t, 200, len(epochHandler.slotToPreRuntimeDigest)) require.Equal(t, uint64(1), epochHandler.epochNumber) require.Equal(t, uint64(9999), epochHandler.firstSlot) - require.Equal(t, constants, epochHandler.constants) + require.Equal(t, testConstants, epochHandler.constants) require.Equal(t, epochData, epochHandler.epochData) require.NotNil(t, epochHandler.handleSlot) } - -func TestEpochHandler_run(t *testing.T) { - sd, err := time.ParseDuration("10ms") - require.NoError(t, err) - startSlot := getCurrentSlot(sd) - - var callsToHandleSlot, firstExecutedSlot uint64 - testHandleSlotFunc := func(epoch uint64, slot Slot, authorityIndex uint32, - preRuntimeDigest *types.PreRuntimeDigest, - ) error { - require.Equal(t, uint64(1), epoch) - if callsToHandleSlot == 0 { - firstExecutedSlot = slot.number - } else { - require.Equal(t, firstExecutedSlot+callsToHandleSlot, slot.number) - } - require.Equal(t, uint32(0), authorityIndex) - require.NotNil(t, preRuntimeDigest) - callsToHandleSlot++ - return nil - } - - epochData := &epochData{ - threshold: scale.MaxUint128, - } - - const epochLength uint64 = 100 - constants := constants{ //nolint:govet - slotDuration: sd, - epochLength: epochLength, - } - - keypair := keyring.Alice().(*sr25519.Keypair) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - epochHandler, err := newEpochHandler(1, startSlot, epochData, constants, testHandleSlotFunc, keypair) - require.NoError(t, err) - require.Equal(t, epochLength, uint64(len(epochHandler.slotToPreRuntimeDigest))) - - errCh := make(chan error) - go epochHandler.run(ctx, errCh) - timer := time.NewTimer(sd * time.Duration(epochLength)) - select { - case <-timer.C: - require.Equal(t, epochLength-(firstExecutedSlot-startSlot), callsToHandleSlot) - case err := <-errCh: - timer.Stop() - require.NoError(t, err) - } -} diff --git a/lib/babe/slot.go b/lib/babe/slot.go new file mode 100644 index 0000000000..0a1d65b479 --- /dev/null +++ b/lib/babe/slot.go @@ -0,0 +1,86 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package babe + +import ( + "context" + "fmt" + "time" +) + +// timeUntilNextSlot calculates, based on the current system time, the remainng +// time to the next slot +func timeUntilNextSlot(slotDuration time.Duration) time.Duration { + now := time.Now().UnixNano() + slotDurationInNano := slotDuration.Nanoseconds() + + nextSlot := (now + slotDurationInNano) / slotDurationInNano + + remaining := nextSlot*slotDurationInNano - now + return time.Duration(remaining) +} + +type slotHandler struct { + slotDuration time.Duration + lastSlot *Slot +} + +func newSlotHandler(slotDuration time.Duration) slotHandler { + return slotHandler{ + slotDuration: slotDuration, + } +} + +// waitForNextSlot returns a new Slot greater than the last one when a new slot starts +// based on the current system time similar to: +// https://github.com/paritytech/substrate/blob/fbddfbd76c60c6fda0024e8a44e82ad776033e4b/client/consensus/slots/src/slots.rs#L125 +func (s *slotHandler) waitForNextSlot(ctx context.Context) (Slot, error) { + for { + // check if there is enough time to collaborate + untilNextSlot := timeUntilNextSlot(s.slotDuration) + oneThirdSlotDuration := s.slotDuration / 3 + if untilNextSlot <= oneThirdSlotDuration { + err := waitUntilNextSlot(ctx, untilNextSlot) + if err != nil { + return Slot{}, fmt.Errorf("waiting next slot: %w", err) + } + } + + currentSystemTime := time.Now() + currentSlotNumber := uint64(currentSystemTime.UnixNano()) / uint64(s.slotDuration.Nanoseconds()) + currentSlot := Slot{ + start: currentSystemTime, + duration: s.slotDuration, + number: currentSlotNumber, + } + + // Never yield the same slot twice + if s.lastSlot == nil || currentSlot.number > s.lastSlot.number { + s.lastSlot = ¤tSlot + return currentSlot, nil + } + + err := waitUntilNextSlot(ctx, untilNextSlot) + if err != nil { + return Slot{}, fmt.Errorf("waiting next slot: %w", err) + } + } +} + +// waitUntilNextSlot is a blocking function that uses context.WithTimeout +// to "sleep", however if the parent context is canceled it releases with +// context.Canceled error +func waitUntilNextSlot(ctx context.Context, untilNextSlot time.Duration) error { + withTimeout, cancelWithTimeout := context.WithTimeout(ctx, untilNextSlot) + defer cancelWithTimeout() + + <-withTimeout.Done() + + parentCtxErr := ctx.Err() + if parentCtxErr != nil { + return parentCtxErr + } + + return nil +} diff --git a/lib/babe/slot_test.go b/lib/babe/slot_test.go new file mode 100644 index 0000000000..a3decc9aef --- /dev/null +++ b/lib/babe/slot_test.go @@ -0,0 +1,58 @@ +// Copyright 2023 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package babe + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSlotHandlerConstructor(t *testing.T) { + t.Parallel() + + expected := slotHandler{ + slotDuration: time.Duration(6000), + } + + handler := newSlotHandler(time.Duration(6000)) + require.Equal(t, expected, handler) +} + +func TestSlotHandlerNextSlot(t *testing.T) { + t.Parallel() + + const slotDuration = 2 * time.Second + handler := newSlotHandler(slotDuration) + + firstIteration, err := handler.waitForNextSlot(context.Background()) + require.NoError(t, err) + + secondIteration, err := handler.waitForNextSlot(context.Background()) + require.NoError(t, err) + + require.Greater(t, secondIteration.number, firstIteration.number) +} + +func TestSlotHandlerNextSlot_ContextCanceled(t *testing.T) { + t.Parallel() + + const slotDuration = 2 * time.Second + handler := newSlotHandler(slotDuration) + + ctx, cancel := context.WithCancel(context.Background()) + + firstIteration, err := handler.waitForNextSlot(ctx) + require.NoError(t, err) + require.NotEqual(t, Slot{}, firstIteration) + + cancel() + + secondIteration, err := handler.waitForNextSlot(ctx) + require.Equal(t, Slot{}, secondIteration) + require.ErrorIs(t, err, context.Canceled) + require.EqualError(t, err, "waiting next slot: context canceled") +}