diff --git a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java index c9ea4c097e9..f1205eaefea 100644 --- a/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java +++ b/ethereum/spec/src/testFixtures/java/tech/pegasys/teku/spec/util/DataStructureUtil.java @@ -114,6 +114,7 @@ import tech.pegasys.teku.spec.datastructures.state.beaconstate.versions.altair.BeaconStateSchemaAltair; import tech.pegasys.teku.spec.datastructures.type.SszPublicKey; import tech.pegasys.teku.spec.datastructures.util.DepositGenerator; +import tech.pegasys.teku.spec.executionengine.ForkChoiceState; import tech.pegasys.teku.spec.schemas.SchemaDefinitions; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsAltair; import tech.pegasys.teku.spec.schemas.SchemaDefinitionsBellatrix; @@ -981,6 +982,16 @@ public EnrForkId randomEnrForkId() { return new EnrForkId(randomBytes4(), randomBytes4(), randomUInt64()); } + public ForkChoiceState randomForkChoiceState(final boolean optimisticHead) { + return new ForkChoiceState( + randomBytes32(), + randomUInt64(), + randomBytes32(), + randomBytes32(), + randomBytes32(), + optimisticHead); + } + public BeaconState randomBeaconState() { return randomBeaconState(100, 100); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index f7d538dbddc..59984e2c0f3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -123,23 +123,8 @@ public SafeFuture start() { private synchronized void startup() { state.set(State.RUNNING); - queueGossipStart(); - } - - private void queueGossipStart() { - LOG.debug("Check if gossip should be started"); - final UInt64 slotsBehind = recentChainData.getChainHeadSlotsBehind().orElseThrow(); - if (slotsBehind.isLessThanOrEqualTo(500)) { - // Start gossip if we're "close enough" to the chain head - // Note: we don't want to be too strict here, otherwise we could end up with our sync logic - // inactive because our chain is almost caught up to the chainhead, but gossip inactive so - // that our node slowly falls behind because no gossip is propagating. However, if we're too - // aggressive, our node could be down-scored for subscribing to topics that it can't yet - // validate or propagate. + if (isCloseToInSync()) { startGossip(); - } else { - // Schedule a future check - asyncRunner.runAfterDelay(this::queueGossipStart, Duration.ofSeconds(10)).reportExceptions(); } } @@ -164,6 +149,29 @@ private synchronized void startGossip() { setTopicScoringParams(); } + @Override + public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) { + if (state.get() != State.RUNNING) { + return; + } + if (isInSync || isCloseToInSync()) { + startGossip(); + } else { + if (gossipStarted.compareAndSet(true, false)) { + LOG.warn("Stopping eth2 gossip while node is syncing"); + gossipForkManager.stopGossip(); + } + } + gossipForkManager.onOptimisticHeadChanged(isOptimistic); + } + + private boolean isCloseToInSync() { + return recentChainData + .getChainHeadSlotsBehind() + .orElse(UInt64.MAX_VALUE) + .isLessThanOrEqualTo(500); + } + private void setTopicScoringParams() { final GossipTopicsScoringConfig topicConfig = gossipConfigurator.configureAllTopics(getEth2Context()); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java index bc1c3b7cd3f..814b53d9831 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java @@ -27,6 +27,8 @@ public interface Eth2P2PNetwork extends P2PNetwork { void onEpoch(UInt64 epoch); + void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic); + void subscribeToAttestationSubnetId(int subnetId); void unsubscribeFromAttestationSubnetId(int subnetId); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java index a606c1a0505..241d494b111 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/BlockGossipManager.java @@ -51,4 +51,9 @@ public BlockGossipManager( public void publishBlock(final SignedBeaconBlock message) { publishMessage(message); } + + @Override + public boolean isEnabledDuringOptimisticSync() { + return true; + } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/GossipManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/GossipManager.java index b12e16d5cde..fb9ba2429f3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/GossipManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/GossipManager.java @@ -17,4 +17,8 @@ public interface GossipManager { void subscribe(); void unsubscribe(); + + default boolean isEnabledDuringOptimisticSync() { + return false; + } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java index 55f67c42f6b..276ffaae84c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManager.java @@ -131,6 +131,20 @@ public synchronized void stopGossip() { // Stop all active gossips activeSubscriptions.forEach(GossipForkSubscriptions::stopGossip); activeSubscriptions.clear(); + // Ensure we will create new active subscriptions if we are started again in the same epoch + currentEpoch = Optional.empty(); + } + + public synchronized void onOptimisticHeadChanged(final boolean isHeadOptimistic) { + if (isHeadOptimistic) { + activeSubscriptions.forEach(GossipForkSubscriptions::stopGossipForOptimisticSync); + } else { + activeSubscriptions.forEach( + subscriptions -> + subscriptions.startGossip( + recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), + false)); + } } public synchronized void publishAttestation(final ValidateableAttestation attestation) { @@ -237,7 +251,8 @@ private boolean isActive(final GossipForkSubscriptions subscriptions) { private void startSubscriptions(final GossipForkSubscriptions subscription) { if (activeSubscriptions.add(subscription)) { subscription.startGossip( - recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot()); + recentChainData.getGenesisData().orElseThrow().getGenesisValidatorsRoot(), + recentChainData.getOptimisticHead().isPresent()); currentAttestationSubnets.forEach(subscription::subscribeToAttestationSubnetId); currentSyncCommitteeSubnets.forEach(subscription::subscribeToSyncCommitteeSubnet); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java index 7643ce7577d..3ac23d7ef91 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkSubscriptions.java @@ -27,10 +27,12 @@ public interface GossipForkSubscriptions { UInt64 getActivationEpoch(); - void startGossip(Bytes32 genesisValidatorsRoot); + void startGossip(Bytes32 genesisValidatorsRoot, boolean isOptimisticHead); void stopGossip(); + void stopGossipForOptimisticSync(); + void publishAttestation(ValidateableAttestation attestation); void publishBlock(SignedBeaconBlock block); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java index 1e92e01cd51..60577ff6297 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/gossip/forks/versions/GossipForkSubscriptionsPhase0.java @@ -106,10 +106,15 @@ public UInt64 getActivationEpoch() { } @Override - public final void startGossip(final Bytes32 genesisValidatorsRoot) { - final ForkInfo forkInfo = new ForkInfo(fork, genesisValidatorsRoot); - addGossipManagers(forkInfo); - gossipManagers.forEach(GossipManager::subscribe); + public final void startGossip( + final Bytes32 genesisValidatorsRoot, final boolean isOptimisticHead) { + if (gossipManagers.isEmpty()) { + final ForkInfo forkInfo = new ForkInfo(fork, genesisValidatorsRoot); + addGossipManagers(forkInfo); + } + gossipManagers.stream() + .filter(manager -> manager.isEnabledDuringOptimisticSync() || !isOptimisticHead) + .forEach(GossipManager::subscribe); } protected void addGossipManagers(final ForkInfo forkInfo) { @@ -194,6 +199,13 @@ public void stopGossip() { gossipManagers.forEach(GossipManager::unsubscribe); } + @Override + public void stopGossipForOptimisticSync() { + gossipManagers.stream() + .filter(manager -> !manager.isEnabledDuringOptimisticSync()) + .forEach(GossipManager::unsubscribe); + } + @Override public void publishAttestation(final ValidateableAttestation attestation) { attestationGossipManager.onNewAttestation(attestation); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java index 3cf5350c80a..0bee52132f7 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java @@ -35,6 +35,9 @@ public NoOpEth2P2PNetwork(final Spec spec) { @Override public void onEpoch(final UInt64 epoch) {} + @Override + public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {} + @Override public void subscribeToAttestationSubnetId(final int subnetId) {} diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index 3ebc1453c19..3d898f3bfb1 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -183,6 +183,52 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() { assertThat(capturedValues.get(3)).containsExactlyInAnyOrder(1, 3); } + @Test + void onSyncStateChanged_shouldEnableGossipWhenInSync() { + // Current slot is a long way beyond the chain head + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); + + assertThat(network.start()).isCompleted(); + // Won't start gossip as chain head is too old + verify(gossipForkManager, never()).configureGossipForEpoch(any()); + + network.onSyncStateChanged(true, false); + + // Even though we're a long way behind, start gossip because we believe we're in sync + verify(gossipForkManager).configureGossipForEpoch(any()); + } + + @Test + void onSyncStateChanged_shouldStopGossipWhenTooFarBehindAndNotInSync() { + // Current slot is a long way beyond the chain head + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); + + assertThat(network.start()).isCompleted(); + network.onSyncStateChanged(true, false); + // Even though we're a long way behind, start gossip because we believe we're in sync + verify(gossipForkManager).configureGossipForEpoch(any()); + + network.onSyncStateChanged(false, false); + verify(gossipForkManager).stopGossip(); + } + + @Test + void onSyncStateChanged_shouldNotifyForkManagerOfOptimisticSyncState() { + assertThat(network.start()).isCompleted(); + + network.onSyncStateChanged(false, true); + verify(gossipForkManager).onOptimisticHeadChanged(true); + + network.onSyncStateChanged(false, false); + verify(gossipForkManager).onOptimisticHeadChanged(false); + + network.onSyncStateChanged(true, true); + verify(gossipForkManager, times(2)).onOptimisticHeadChanged(true); + + network.onSyncStateChanged(true, false); + verify(gossipForkManager, times(2)).onOptimisticHeadChanged(false); + } + @SuppressWarnings("unchecked") private ArgumentCaptor> subnetIdCaptor() { return ArgumentCaptor.forClass(Iterable.class); diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java index 0f589466c15..777b5412bd6 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/gossip/forks/GossipForkManagerTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -72,7 +73,7 @@ void shouldActivateCurrentForkOnStart() { final GossipForkManager manager = builder().fork(currentForkSubscriptions).build(); manager.configureGossipForEpoch(UInt64.ZERO); - verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT); + verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT, false); } @Test @@ -84,8 +85,8 @@ void shouldActivateCurrentAndNextForkOnStartIfNextForkWithinTwoEpochs() { manager.configureGossipForEpoch(UInt64.valueOf(3)); - verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT); - verify(nextForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT); + verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(nextForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT, false); } @Test @@ -98,10 +99,10 @@ void shouldActivateMultipleFutureForksIfTheyAreWithinTwoEpochs() { managerForForks(currentFork, nextFork, laterFork, tooLateFork) .configureGossipForEpoch(UInt64.ONE); - verify(currentFork).startGossip(GENESIS_VALIDATORS_ROOT); - verify(nextFork).startGossip(GENESIS_VALIDATORS_ROOT); - verify(laterFork).startGossip(GENESIS_VALIDATORS_ROOT); - verify(tooLateFork, never()).startGossip(any()); + verify(currentFork).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(nextFork).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(laterFork).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(tooLateFork, never()).startGossip(any(), anyBoolean()); } @Test @@ -113,8 +114,8 @@ void shouldNotStartNextForkIfNotWithinTwoEpochs() { manager.configureGossipForEpoch(UInt64.valueOf(2)); - verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT); - verify(nextForkSubscriptions, never()).startGossip(any()); + verify(currentForkSubscriptions).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(nextForkSubscriptions, never()).startGossip(any(), anyBoolean()); } @Test @@ -133,6 +134,27 @@ void shouldStopActiveSubscriptionsOnStop() { verify(laterForkSubscriptions, never()).stopGossip(); } + @Test + void shouldResubscribeAfterStopping() { + final GossipForkSubscriptions currentForkSubscriptions = forkAtEpoch(0); + final GossipForkSubscriptions nextForkSubscriptions = forkAtEpoch(5); + final GossipForkSubscriptions laterForkSubscriptions = forkAtEpoch(10); + final GossipForkManager manager = + managerForForks(currentForkSubscriptions, nextForkSubscriptions, laterForkSubscriptions); + manager.configureGossipForEpoch(UInt64.valueOf(3)); + verify(currentForkSubscriptions, times(1)).startGossip(any(), anyBoolean()); + verify(nextForkSubscriptions, times(1)).startGossip(any(), anyBoolean()); + + manager.stopGossip(); + + verify(currentForkSubscriptions).stopGossip(); + verify(nextForkSubscriptions).stopGossip(); + + manager.configureGossipForEpoch(UInt64.valueOf(3)); + verify(currentForkSubscriptions, times(2)).startGossip(any(), anyBoolean()); + verify(nextForkSubscriptions, times(2)).startGossip(any(), anyBoolean()); + } + @Test void shouldStopForkTwoEpochsAfterTheNextOneActivates() { final GossipForkSubscriptions genesisFork = forkAtEpoch(0); @@ -141,14 +163,14 @@ void shouldStopForkTwoEpochsAfterTheNextOneActivates() { final GossipForkManager manager = managerForForks(genesisFork, newFork); manager.configureGossipForEpoch(UInt64.valueOf(4)); - verify(genesisFork).startGossip(GENESIS_VALIDATORS_ROOT); - verify(newFork).startGossip(GENESIS_VALIDATORS_ROOT); + verify(genesisFork).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(newFork).startGossip(GENESIS_VALIDATORS_ROOT, false); // Shouldn't make any changes in epochs 5 or 6 manager.configureGossipForEpoch(UInt64.valueOf(5)); manager.configureGossipForEpoch(UInt64.valueOf(6)); - verify(genesisFork, times(1)).startGossip(GENESIS_VALIDATORS_ROOT); - verify(newFork, times(1)).startGossip(GENESIS_VALIDATORS_ROOT); + verify(genesisFork, times(1)).startGossip(GENESIS_VALIDATORS_ROOT, false); + verify(newFork, times(1)).startGossip(GENESIS_VALIDATORS_ROOT, false); verify(genesisFork, never()).stopGossip(); verify(newFork, never()).stopGossip(); @@ -169,17 +191,17 @@ void shouldProcessForkChangesWhenEpochsAreMissed() { // Should start the genesis subscriptions on first call manager.configureGossipForEpoch(UInt64.ZERO); - verify(genesisFork).startGossip(GENESIS_VALIDATORS_ROOT); + verify(genesisFork).startGossip(GENESIS_VALIDATORS_ROOT, false); // Jump to epoch 10 and should wind up with only laterFork active manager.configureGossipForEpoch(UInt64.valueOf(10)); verify(genesisFork).stopGossip(); // No point starting newFork as it's already due to be stopped - verify(newFork, never()).startGossip(GENESIS_VALIDATORS_ROOT); + verify(newFork, never()).startGossip(GENESIS_VALIDATORS_ROOT, false); verify(newFork, never()).stopGossip(); - verify(laterFork).startGossip(GENESIS_VALIDATORS_ROOT); + verify(laterFork).startGossip(GENESIS_VALIDATORS_ROOT, false); } @Test @@ -359,7 +381,7 @@ void shouldSubscribeToCurrentAttestationSubnetsWhenNewForkActivates( manager.configureGossipForEpoch(UInt64.valueOf(8)); - verify(secondFork).startGossip(GENESIS_VALIDATORS_ROOT); + verify(secondFork).startGossip(GENESIS_VALIDATORS_ROOT, false); subscriptionType.verifySubscribe(secondFork, 1); subscriptionType.verifySubscribe(secondFork, 2); subscriptionType.verifySubscribe(secondFork, 5); @@ -437,12 +459,52 @@ void shouldNotSubscribeToSubnetThatWasUnsubscribedWhenNewForkActivates( manager.configureGossipForEpoch(UInt64.valueOf(8)); - verify(secondFork).startGossip(GENESIS_VALIDATORS_ROOT); + verify(secondFork).startGossip(GENESIS_VALIDATORS_ROOT, false); subscriptionType.verifySubscribe(secondFork, 1); subscriptionType.verifyNotSubscribed(secondFork, 2); subscriptionType.verifySubscribe(secondFork, 5); } + @Test + void shouldStopAndRestartNonOptimisticSyncTopics() { + final GossipForkSubscriptions subscriptions = forkAtEpoch(0); + final GossipForkManager manager = managerForForks(subscriptions); + + manager.configureGossipForEpoch(UInt64.ZERO); + verify(subscriptions, times(1)).startGossip(GENESIS_VALIDATORS_ROOT, false); + + manager.onOptimisticHeadChanged(true); + verify(subscriptions).stopGossipForOptimisticSync(); + + manager.onOptimisticHeadChanged(false); + verify(subscriptions, times(2)).startGossip(GENESIS_VALIDATORS_ROOT, false); + } + + @Test + void shouldIgnoreOptimisticHeadChangesWhenNotStarted() { + final GossipForkSubscriptions subscriptions = forkAtEpoch(0); + final GossipForkManager manager = managerForForks(subscriptions); + + manager.onOptimisticHeadChanged(true); + verify(subscriptions, never()).stopGossipForOptimisticSync(); + + manager.onOptimisticHeadChanged(false); + verify(subscriptions, never()).startGossip(any(), anyBoolean()); + } + + @Test + void shouldStartSubscriptionsInOptimisticSyncMode() { + when(recentChainData.getOptimisticHead()) + .thenReturn(Optional.of(dataStructureUtil.randomForkChoiceState(true))); + + final GossipForkSubscriptions subscriptions = forkAtEpoch(0); + final GossipForkManager manager = managerForForks(subscriptions); + + manager.configureGossipForEpoch(UInt64.ZERO); + + verify(subscriptions).startGossip(GENESIS_VALIDATORS_ROOT, true); + } + private GossipForkSubscriptions forkAtEpoch(final long epoch) { final GossipForkSubscriptions subscriptions = mock(GossipForkSubscriptions.class, "subscriptionsForEpoch" + epoch); diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 865358e635e..c23db3f3c48 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -119,6 +119,7 @@ import tech.pegasys.teku.sync.SyncService; import tech.pegasys.teku.sync.SyncServiceFactory; import tech.pegasys.teku.sync.events.CoalescingChainHeadChannel; +import tech.pegasys.teku.sync.events.SyncState; import tech.pegasys.teku.validator.api.InteropConfig; import tech.pegasys.teku.validator.api.ValidatorApiChannel; import tech.pegasys.teku.validator.api.ValidatorPerformanceTrackingMode; @@ -856,6 +857,13 @@ public void initSyncService() { .ifPresent( isOptimistic -> syncService.getOptimisticSyncSubscriber().onOptimisticHeadChanged(isOptimistic)); + + // Subscribe to sync state changes so gossip can be enabled and disabled appropriately + syncService.subscribeToSyncStateChanges( + state -> p2pNetwork.onSyncStateChanged(state.isInSync(), state.isOptimistic())); + // Ensure the initial state is setup correctly + final SyncState currentSyncState = syncService.getCurrentSyncState(); + p2pNetwork.onSyncStateChanged(currentSyncState.isInSync(), currentSyncState.isOptimistic()); } protected void initOperationsReOrgManager() { diff --git a/sync/src/main/java/tech/pegasys/teku/sync/events/SyncState.java b/sync/src/main/java/tech/pegasys/teku/sync/events/SyncState.java index 08bc742dc23..b0510eb918b 100644 --- a/sync/src/main/java/tech/pegasys/teku/sync/events/SyncState.java +++ b/sync/src/main/java/tech/pegasys/teku/sync/events/SyncState.java @@ -16,6 +16,7 @@ public enum SyncState { START_UP, SYNCING, + OPTIMISTIC_SYNCING, IN_SYNC; public boolean isInSync() { @@ -27,6 +28,10 @@ public boolean isStartingUp() { } public boolean isSyncing() { - return this == SYNCING; + return this == SYNCING || this == OPTIMISTIC_SYNCING; + } + + public boolean isOptimistic() { + return this == OPTIMISTIC_SYNCING; } } diff --git a/sync/src/main/java/tech/pegasys/teku/sync/events/SyncStateTracker.java b/sync/src/main/java/tech/pegasys/teku/sync/events/SyncStateTracker.java index e01d6cf869a..bd5c4385ac1 100644 --- a/sync/src/main/java/tech/pegasys/teku/sync/events/SyncStateTracker.java +++ b/sync/src/main/java/tech/pegasys/teku/sync/events/SyncStateTracker.java @@ -103,7 +103,9 @@ public boolean unsubscribeFromSyncStateChanges(long subscriberId) { private void updateCurrentState() { final SyncState previousState = currentState; - if (syncActive || headIsOptimistic) { + if (headIsOptimistic) { + currentState = SyncState.OPTIMISTIC_SYNCING; + } else if (syncActive) { currentState = SyncState.SYNCING; } else if (startingUp) { currentState = SyncState.START_UP; diff --git a/sync/src/test/java/tech/pegasys/teku/sync/events/SyncStateTrackerTest.java b/sync/src/test/java/tech/pegasys/teku/sync/events/SyncStateTrackerTest.java index 4bd909a6e16..b8cc4018e3d 100644 --- a/sync/src/test/java/tech/pegasys/teku/sync/events/SyncStateTrackerTest.java +++ b/sync/src/test/java/tech/pegasys/teku/sync/events/SyncStateTrackerTest.java @@ -120,11 +120,12 @@ public void shouldNotLogOptimisticMessagesOnStartUp() { // start syncing syncSubscriber.onSyncingChange(true); - assertSyncState(SyncState.SYNCING); + assertSyncState(SyncState.OPTIMISTIC_SYNCING); verify(eventLogger).syncStart(); // head no more optimistic tracker.onOptimisticHeadChanged(false); + assertSyncState(SyncState.SYNCING); // in sync syncSubscriber.onSyncingChange(false); @@ -133,7 +134,7 @@ public void shouldNotLogOptimisticMessagesOnStartUp() { // turn head optimistic tracker.onOptimisticHeadChanged(true); - assertSyncState(SyncState.SYNCING); + assertSyncState(SyncState.OPTIMISTIC_SYNCING); verifyNoMoreInteractions(eventLogger); }