-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reset cursors and deduplicate blobs in controller #1261
Reset cursors and deduplicate blobs in controller #1261
Conversation
d02be2e
to
474165c
Compare
// | ||
// Note: this status is not currently implemented, and is a placeholder for future functionality. | ||
GATHERING_SIGNATURES = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this now implemented?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It moves a blob to this state as soon as it starts collecting attestation from the DA network. It doesn't periodically update the attestation results though. It just returns empty attestation in this state until it's "complete" for now
cc @litt3
disperser/controller/blob_queue.go
Outdated
v2 "github.com/Layr-Labs/eigenda/core/v2" | ||
) | ||
|
||
type BlobQueue interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, can you add a one or two sentence godoc to this interface to describe its purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps BlobSet
would be a better name? This struct doesn't implement a queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
disperser/controller/dispatcher.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking through potential interleavings, and I'm concerned about the following race condition.
Race Condition
The following two methods can be executing concurrently:
HandleBatch()
: this method gets blobs from dynamo, and may accidentally fetch duplicates. Duplicate blobs are detected and discarded.HandleSignatures()
: this method looks at new signatures. If enough time has passed, it will update the blob status and remove the blob from the ones we are tracking. It also removes the blob from the set used to detect duplicates.
Scenario:
HandleBatch()
reads blobX
from dynamo.X
is not a duplicate, and so the dispatcher begins tracking blobX
.- Later on,
HandleBatch()
andHandleSignatures()
are being processed in parallel.HandleBatch()
reads blobX
. It has not yet discardedX
as a duplicate.HandleSignatures()
identifies blobX
as having become complete, and removes blobX
from the set used to detect duplicates.HandleBatch()
checks to see ifX
is in the duplicate set, but does not see it. It starts trackingX
again.
Possible Fixes
Option 1: fewer threads
Instead of calling HandleSignatures()
in a gouroutine, call it directly in the dispatcher's main loop. It may be the case that this parallelism is not strictly necessary.
Option 2: one thread manages the set tracking duplicates
- only the
HandleBatch()
goroutine is allowed to make updates to set tracking duplicates - when
HandleSignatures()
decides that a blob is complete/failed, instead of removing it from the set tracking duplicates, it puts the blob key into a channel. It is important to do this AFTER the blob has been marked as complete/failed in dynamo. - Every time
HandleBatch()
runs, it removes blob keys from the channel, and removes them from the duplciate set. It is important that it does this BEFORE it queries dynamo.- Every blob removed from the set tracking duplicates is gauranteed to be marked as complete/failed in dynamo.
- This change will allow you to remove the mutex from the set that tracks duplicates, since it will only be invoked by a single goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
HandleSignatures
first moves it to GatheringSignatures
state before spending many seconds waiting for signatures, after which it moves blob to Complete/Failed
, after which it removes the blob from the duplicate set.
The dedup logic runs on pure CPU, no IO. For this to interfere with HandleSignatures
like the scenario, this dedup logic needs to take more than the entire time HandleSignatures
takes (10-30s).
I don't think option 1 is reasonable given the e2e latency requirements. I don't think option 2 resolves this race condition either, actually i think it makes this problem worse because the finished blob isn't removed from the duplicate set as soon as possible.
If this race condition ever happens, the worst thing is a blob gets dispersed twice (included 2 batches), which is manageable. I'd rather keep this logic simple if there aren't other concerns. Wdyt?
474165c
to
7fedc98
Compare
7fedc98
to
aaa8c03
Compare
aaa8c03
to
75944c6
Compare
Why are these changes needed?
GetBlobMetadataByStatusPaginated
: Cursor is reset to nil when all records have been enumerated. This will ensure that any blobs that have not been enumerated will be eventually picked up (in controller's encoding manager and dispatcher)EncodingManager
andDispatcher
keep track of blobs in flight and deduplicate themDispatcher
transitions blobs toGatheringSignatures
status and creates an empty attestation object before sending payloads to DA network. These blobs are finalized toComplete
orFailed
status after the dispersal is complete (has received all signatures or timed out)Checks