Skip to content

Commit

Permalink
Quarantine and reassembly of gossiped blobs and blocks (#4808)
Browse files Browse the repository at this point in the history
  • Loading branch information
henridf authored Apr 13, 2023
1 parent fa36555 commit 021de18
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 42 deletions.
70 changes: 70 additions & 0 deletions beacon_chain/consensus_object_pools/blob_quarantine.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# beacon_chain
# Copyright (c) 2018-2023 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
std/tables,
../spec/datatypes/deneb


const
MaxBlobs = SLOTS_PER_EPOCH * MAX_BLOBS_PER_BLOCK


type
BlobQuarantine* = object
blobs*: Table[(Eth2Digest, BlobIndex), ref BlobSidecar]


func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
if quarantine.blobs.lenu64 > MaxBlobs:
return
discard quarantine.blobs.hasKeyOrPut((blobSidecar.block_root,
blobSidecar.index), blobSidecar)

func blobIndices*(quarantine: BlobQuarantine, digest: Eth2Digest):
seq[BlobIndex] =
var r: seq[BlobIndex] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
if quarantine.blobs.hasKey((digest, i)):
r.add(i)
r

func hasBlob*(quarantine: BlobQuarantine, blobSidecar: BlobSidecar) : bool =
quarantine.blobs.hasKey((blobSidecar.block_root, blobSidecar.index))

func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
var b: ref BlobSidecar
if quarantine.blobs.pop((digest, i), b):
r.add(b)
r

func peekBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
quarantine.blobs.withValue((digest, i), value):
r.add(value[])
r

func removeBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest) =
for i in 0..MAX_BLOBS_PER_BLOCK-1:
quarantine.blobs.del((digest, i))

func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
bool =
let idxs = quarantine.blobIndices(blck.root)
if len(blck.message.body.blob_kzg_commitments) != len(idxs):
return false
for i in 0..len(idxs):
if idxs[i] != uint64(i):
return false
true
105 changes: 85 additions & 20 deletions beacon_chain/consensus_object_pools/block_quarantine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const
## Arbitrary
MaxOrphans = SLOTS_PER_EPOCH * 3
## Enough for finalization in an alternative fork
MaxBlobless = SLOTS_PER_EPOCH
## Arbitrary
MaxUnviables = 16 * 1024
## About a day of blocks - most likely not needed but it's quite cheap..

Expand All @@ -39,10 +41,19 @@ type
## Trivially invalid blocks may be dropped before reaching this stage.

orphans*: Table[(Eth2Digest, ValidatorSig), ForkedSignedBeaconBlock]
## Blocks that we don't have a parent for - when we resolve the parent, we
## can proceed to resolving the block as well - we index this by root and
## signature such that a block with invalid signature won't cause a block
## with a valid signature to be dropped
## Blocks that we don't have a parent for - when we resolve the
## parent, we can proceed to resolving the block as well - we
## index this by root and signature such that a block with
## invalid signature won't cause a block with a valid signature
## to be dropped. An orphan block may also be "blobless" (see
## below) - if so, upon resolving the parent, it should be
## stored in the blobless table.

blobless*: Table[(Eth2Digest, ValidatorSig), deneb.SignedBeaconBlock]
## Blocks that we don't have blobs for. When we have received
## all blobs for this block, we can proceed to resolving the
## block as well. A blobless block inserted into this table must
## have a resolved parent (i.e., it is not an orphan).

unviable*: OrderedTable[Eth2Digest, tuple[]]
## Unviable blocks are those that come from a history that does not
Expand Down Expand Up @@ -115,12 +126,14 @@ func removeOrphan*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.orphans.del((signedBlock.root, signedBlock.signature))

func isViableOrphan(
finalizedSlot: Slot, signedBlock: ForkedSignedBeaconBlock): bool =
func removeBlobless*(
quarantine: var Quarantine, signedBlock: ForkySignedBeaconBlock) =
quarantine.blobless.del((signedBlock.root, signedBlock.signature))

func isViable(
finalizedSlot: Slot, slot: Slot): bool =
# The orphan must be newer than the finalization point so that its parent
# either is the finalized block or more recent
let
slot = getForkedBlockField(signedBlock, slot)
slot > finalizedSlot

func cleanupUnviable(quarantine: var Quarantine) =
Expand All @@ -131,47 +144,76 @@ func cleanupUnviable(quarantine: var Quarantine) =
break # Cannot modify while for-looping
quarantine.unviable.del(toDel)

func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
if root in quarantine.unviable:
return

quarantine.cleanupUnviable()

func removeUnviableTree[T](quarantine: var Quarantine,
toCheck: var seq[Eth2Digest],
tbl: var Table[(Eth2Digest, ValidatorSig),
T]): seq[Eth2Digest] =
# Remove the tree of orphans whose ancestor is unviable - they are now also
# unviable! This helps avoiding junk in the quarantine, because we don't keep
# unviable parents in the DAG and there's no way to tell an orphan from an
# unviable block without the parent.
var
toRemove: seq[(Eth2Digest, ValidatorSig)] # Can't modify while iterating
toCheck = @[root]
checked: seq[Eth2Digest]
while toCheck.len > 0:
let root = toCheck.pop()
for k, v in quarantine.orphans.mpairs():
if getForkedBlockField(v, parent_root) == root:
if root notin checked:
checked.add(root)
for k, v in tbl.mpairs():
when T is ForkedSignedBeaconBlock:
let blockRoot = getForkedBlockField(v, parent_root)
elif T is ForkySignedBeaconBlock:
let blockRoot = v.message.parent_root
else:
static: doAssert false

if blockRoot == root:
toCheck.add(k[0])
toRemove.add(k)
elif k[0] == root:
toRemove.add(k)

for k in toRemove:
quarantine.orphans.del k
tbl.del k
quarantine.unviable[k[0]] = ()

toRemove.setLen(0)

checked

func addUnviable*(quarantine: var Quarantine, root: Eth2Digest) =
if root in quarantine.unviable:
return

quarantine.cleanupUnviable()
var toCheck = @[root]
var checked = quarantine.removeUnviableTree(toCheck, quarantine.orphans)
discard quarantine.removeUnviableTree(checked, quarantine.blobless)

quarantine.unviable[root] = ()

func cleanupOrphans(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]

for k, v in quarantine.orphans:
if not isViableOrphan(finalizedSlot, v):
if not isViable(finalizedSlot, getForkedBlockField(v, slot)):
toDel.add k

for k in toDel:
quarantine.addUnviable k[0]
quarantine.orphans.del k

func cleanupBlobless(quarantine: var Quarantine, finalizedSlot: Slot) =
var toDel: seq[(Eth2Digest, ValidatorSig)]

for k, v in quarantine.blobless:
if not isViable(finalizedSlot, v.message.slot):
toDel.add k

for k in toDel:
quarantine.addUnviable k[0]
quarantine.blobless.del k

func clearAfterReorg*(quarantine: var Quarantine) =
## Clear missing and orphans to start with a fresh slate in case of a reorg
## Unviables remain unviable and are not cleared.
Expand All @@ -193,7 +235,7 @@ func addOrphan*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: ForkedSignedBeaconBlock): bool =
## Adds block to quarantine's `orphans` and `missing` lists.
if not isViableOrphan(finalizedSlot, signedBlock):
if not isViable(finalizedSlot, getForkedBlockField(signedBlock, slot)):
quarantine.addUnviable(signedBlock.root)
return false

Expand Down Expand Up @@ -230,3 +272,26 @@ iterator pop*(quarantine: var Quarantine, root: Eth2Digest):
if getForkedBlockField(v, parent_root) == root:
toRemove.add(k)
yield v

proc addBlobless*(
quarantine: var Quarantine, finalizedSlot: Slot,
signedBlock: deneb.SignedBeaconBlock): bool =

if not isViable(finalizedSlot, signedBlock.message.slot):
quarantine.addUnviable(signedBlock.root)
return false

quarantine.cleanupBlobless(finalizedSlot)

if quarantine.blobless.lenu64 >= MaxBlobless:
return false

quarantine.blobless[(signedBlock.root, signedBlock.signature)] = signedBlock
quarantine.missing.del(signedBlock.root)
true

iterator peekBlobless*(quarantine: var Quarantine, root: Eth2Digest):
deneb.SignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():
if k[0] == root:
yield v
24 changes: 22 additions & 2 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ from ../consensus_object_pools/block_dag import BlockRef, root, shortLog, slot
from ../consensus_object_pools/block_pools_types import
EpochRef, VerifierError
from ../consensus_object_pools/block_quarantine import
addOrphan, addUnviable, pop, removeOrphan
addBlobless, addOrphan, addUnviable, pop, removeOrphan
from ../consensus_object_pools/blob_quarantine import
BlobQuarantine, hasBlobs, popBlobs
from ../validators/validator_monitor import
MsgSource, ValidatorMonitor, registerAttestationInBlock, registerBeaconBlock,
registerSyncAggregateInBlock
Expand Down Expand Up @@ -91,6 +93,7 @@ type
validatorMonitor: ref ValidatorMonitor
getBeaconTime: GetBeaconTimeFn

blobQuarantine: ref BlobQuarantine
verifier: BatchVerifier

lastPayload: Slot
Expand Down Expand Up @@ -123,6 +126,7 @@ proc new*(T: type BlockProcessor,
rng: ref HmacDrbgContext, taskpool: TaskPoolPtr,
consensusManager: ref ConsensusManager,
validatorMonitor: ref ValidatorMonitor,
blobQuarantine: ref BlobQuarantine,
getBeaconTime: GetBeaconTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
Expand All @@ -131,6 +135,7 @@ proc new*(T: type BlockProcessor,
blockQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
validatorMonitor: validatorMonitor,
blobQuarantine: blobQuarantine,
getBeaconTime: getBeaconTime,
verifier: BatchVerifier(rng: rng, taskpool: taskpool)
)
Expand Down Expand Up @@ -564,7 +569,22 @@ proc storeBlock*(

for quarantined in self.consensusManager.quarantine[].pop(blck.get().root):
# Process the blocks that had the newly accepted block as parent
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
withBlck(quarantined):
when typeof(blck).toFork() < ConsensusFork.Deneb:
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
else:
if len(blck.message.body.blob_kzg_commitments) == 0:
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
else:
if self.blobQuarantine[].hasBlobs(blck):
let blobs = self.blobQuarantine[].popBlobs(blck.root)
self[].addBlock(MsgSource.gossip, quarantined, blobs)
else:
if not self.consensusManager.quarantine[].addBlobless(
dag.finalizedHead.slot, blck):
notice "Block quarantine full (blobless)",
blockRoot = shortLog(quarantined.root),
signature = shortLog(quarantined.signature)

return Result[BlockRef, (VerifierError, ProcessingStatus)].ok blck.get

Expand Down
Loading

0 comments on commit 021de18

Please sign in to comment.