Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failed broadcast is not a routing error #3843

Merged
merged 1 commit into from
Jul 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 84 additions & 104 deletions beacon_chain/validators/message_router.nim
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,26 @@ proc routeSignedBeaconBlock*(
let
sendTime = router[].getCurrentBeaconTime()
delay = sendTime - blck.message.slot.block_deadline()
# The block passed basic gossip validation - we can "safely" broadcast it
# now. In fact, per the spec, we should broadcast it even if it later fails
# to apply to our state.
res = await router[].network.broadcastBeaconBlock(blck)

# The block passed basic gossip validation - we can "safely" broadcast it now.
# In fact, per the spec, we should broadcast it even if it later fails to
# apply to our state.
block:
let res = await router[].network.broadcastBeaconBlock(blck)
if res.isErr:
notice "Block not sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()
return err(res.error())
if res.isOk():
beacon_blocks_sent.inc()
beacon_blocks_sent_delay.observe(delay.toFloatSeconds())

notice "Block sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), delay
else: # "no broadcast" is not a fatal error
notice "Block not sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()

let newBlockRef = await router[].blockProcessor.storeBlock(
MsgSource.api, sendTime, blck, true)
let
newBlockRef = await router[].blockProcessor.storeBlock(
MsgSource.api, sendTime, blck, true)

# The boolean we return tells the caller whether the block was integrated
# into the chain
Expand All @@ -156,13 +162,6 @@ proc routeSignedBeaconBlock*(
signature = shortLog(blck.signature), err = newBlockRef.error()
return ok(Opt.none(BlockRef))

beacon_blocks_sent.inc()
beacon_blocks_sent_delay.observe(delay.toFloatSeconds())

notice "Block sent",
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), delay

return ok(Opt.some(newBlockRef.get()))

proc routeAttestation*(
Expand All @@ -182,20 +181,17 @@ proc routeAttestation*(
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - attestation.data.slot.attestation_deadline()
res = await router[].network.broadcastAttestation(subnet_id, attestation)

block:
let res = await router[].network.broadcastAttestation(
subnet_id, attestation)
if not res.isOk:
notice "Attestation not sent",
attestation = shortLog(attestation), error = res.error()
return err(res.error())

beacon_attestations_sent.inc()
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())
if res.isOk():
beacon_attestations_sent.inc()
beacon_attestation_sent_delay.observe(delay.toFloatSeconds())

notice "Attestation sent",
attestation = shortLog(attestation), delay, subnet_id
notice "Attestation sent",
attestation = shortLog(attestation), delay, subnet_id
else: # "no broadcast" is not a fatal error
notice "Attestation not sent",
attestation = shortLog(attestation), error = res.error()

return ok()

Expand Down Expand Up @@ -250,23 +246,21 @@ proc routeSignedAggregateAndProof*(
let
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - proof.message.aggregate.data.slot.aggregate_deadline()

block:
let res = await router[].network.broadcastAggregateAndProof(proof)
if not res.isOk():
notice "Aggregated attestation not sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature), error = res.error()
return err(res.error())

beacon_aggregates_sent.inc()

notice "Aggregated attestation sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
selection_proof = shortLog(proof.message.selection_proof),
signature = shortLog(proof.signature), delay
res = await router[].network.broadcastAggregateAndProof(proof)

if res.isOk():
beacon_aggregates_sent.inc()

notice "Aggregated attestation sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
selection_proof = shortLog(proof.message.selection_proof),
signature = shortLog(proof.signature), delay
else: # "no broadcast" is not a fatal error
notice "Aggregated attestation not sent",
attestation = shortLog(proof.message.aggregate),
aggregator_index = proof.message.aggregator_index,
signature = shortLog(proof.signature), error = res.error()

return ok()

Expand All @@ -287,18 +281,17 @@ proc routeSyncCommitteeMessage*(
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - msg.slot.sync_committee_message_deadline()

block:
let res = await router[].network.broadcastSyncCommitteeMessage(
res = await router[].network.broadcastSyncCommitteeMessage(
msg, subcommitteeIdx)
if not res.isOk():
notice "Sync committee message not sent",
message = shortLog(msg), error = res.error()
return err(res.error())

beacon_sync_committee_messages_sent.inc()
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())
if res.isOk():
beacon_sync_committee_messages_sent.inc()
beacon_sync_committee_message_sent_delay.observe(delay.toFloatSeconds())

notice "Sync committee message sent", message = shortLog(msg), delay
notice "Sync committee message sent", message = shortLog(msg), delay
else: # "no broadcast" is not a fatal error
notice "Sync committee message not sent",
message = shortLog(msg), error = res.error()

if router[].onSyncCommitteeMessage != nil:
router[].onSyncCommitteeMessage(msg.slot)
Expand Down Expand Up @@ -407,23 +400,20 @@ proc routeSignedContributionAndProof*(
sendTime = router[].processor.getCurrentBeaconTime()
delay = sendTime - msg.message.contribution.slot.sync_contribution_deadline()

block:
let res = await router[].network.broadcastSignedContributionAndProof(msg)
if not res.isOk():
notice "Contribution not sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), error = res.error()
return err(res.error())

beacon_sync_committee_contributions_sent.inc()

notice "Contribution sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), delay
let res = await router[].network.broadcastSignedContributionAndProof(msg)
if res.isOk():
beacon_sync_committee_contributions_sent.inc()
notice "Contribution sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), delay
else: # "no broadcast" is not a fatal error
notice "Contribution not sent",
contribution = shortLog(msg.message.contribution),
aggregator_index = msg.message.aggregator_index,
selection_proof = shortLog(msg.message.selection_proof),
signature = shortLog(msg.signature), error = res.error()

return ok()

Expand All @@ -438,16 +428,12 @@ proc routeSignedVoluntaryExit*(
exit = shortLog(exit), error = res.error()
return err(res.error()[1])

block:
let res = await router[].network.broadcastVoluntaryExit(exit)
if not res.isOk():
notice "Voluntary exit not sent",
exit = shortLog(exit), error = res.error()
return err(res.error())

beacon_voluntary_exits_sent.inc()

notice "Voluntary exit sent", exit = shortLog(exit)
let res = await router[].network.broadcastVoluntaryExit(exit)
if res.isOk():
beacon_voluntary_exits_sent.inc()
notice "Voluntary exit sent", exit = shortLog(exit)
else: # "no broadcast" is not a fatal error
notice "Voluntary exit not sent", exit = shortLog(exit), error = res.error()

return ok()

Expand All @@ -462,16 +448,13 @@ proc routeAttesterSlashing*(
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])

block:
let res = await router[].network.broadcastAttesterSlashing(slashing)
if not res.isOk():
notice "Attester slashing not sent",
slashing = shortLog(slashing), error = res.error()
return err(res.error())

beacon_attester_slashings_sent.inc()

notice "Attester slashing sent", slashing = shortLog(slashing)
let res = await router[].network.broadcastAttesterSlashing(slashing)
if res.isOk():
beacon_attester_slashings_sent.inc()
notice "Attester slashing sent", slashing = shortLog(slashing)
else: # "no broadcast" is not a fatal error
notice "Attester slashing not sent",
slashing = shortLog(slashing), error = res.error()

return ok()

Expand All @@ -486,15 +469,12 @@ proc routeProposerSlashing*(
slashing = shortLog(slashing), error = res.error()
return err(res.error()[1])

block:
let res = await router[].network.broadcastProposerSlashing(slashing)
if not res.isOk():
notice "Proposer slashing not sent",
slashing = shortLog(slashing), error = res.error()
return err(res.error())

beacon_proposer_slashings_sent.inc()

notice "Proposer slashing sent", slashing = shortLog(slashing)
let res = await router[].network.broadcastProposerSlashing(slashing)
if res.isOk():
beacon_proposer_slashings_sent.inc()
notice "Proposer slashing sent", slashing = shortLog(slashing)
else: # "no broadcast" is not a fatal error
notice "Proposer slashing not sent",
slashing = shortLog(slashing), error = res.error()

return ok()