Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset cursors and deduplicate blobs in controller #1261

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions api/docs/disperser_v2.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/disperser_v2.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ enum BlobStatus {
// and in doing so requesting that the validators sign to acknowledge receipt of the blob.
// Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
// after which the BlobStatus becomes COMPLETE.
//
// Note: this status is not currently implemented, and is a placeholder for future functionality.
GATHERING_SIGNATURES = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this now implemented?

Copy link
Contributor Author

@ian-shim ian-shim Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It moves a blob to this state as soon as it starts collecting attestation from the DA network. It doesn't periodically update the attestation results though. It just returns empty attestation in this state until it's "complete" for now
cc @litt3


// COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
Expand Down
16 changes: 11 additions & 5 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error)
GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error)
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error)
Expand Down Expand Up @@ -276,8 +276,8 @@ func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item,

// GetItems returns the items for the given keys
// Note: ordering of items is not guaranteed
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys)
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys, consistentRead)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -444,7 +444,12 @@ func (c *client) writeItems(ctx context.Context, tableName string, requestItems
return failedItems, nil
}

func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
func (c *client) readItems(
ctx context.Context,
tableName string,
keys []Key,
consistentRead bool,
) ([]Item, error) {
startIndex := 0
items := make([]Item, 0)
for startIndex < len(keys) {
Expand All @@ -454,7 +459,8 @@ func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([
output, err := c.dynamoClient.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
RequestItems: map[string]types.KeysAndAttributes{
tableName: {
Keys: keysBatch,
Keys: keysBatch,
ConsistentRead: aws.Bool(consistentRead),
},
},
})
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestBatchOperations(t *testing.T) {
}
}

fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys)
fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys, true)
assert.NoError(t, err)
assert.Len(t, fetchedItems, numItems)
blobKeys := make([]string, numItems)
Expand Down
2 changes: 1 addition & 1 deletion common/aws/mock/dynamodb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Item, error) {
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key, consistentRead bool) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}
Expand Down
16 changes: 12 additions & 4 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,22 @@ func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
quorumResults[i] = a.QuorumResults[q]
}

apkG2Bytes := a.APKG2.Bytes()
sigmaBytes := a.Sigma.Bytes()
var apkG2Bytes []byte
var sigmaBytes []byte
if a.APKG2 != nil {
b := a.APKG2.Bytes()
apkG2Bytes = b[:]
}
if a.Sigma != nil {
b := a.Sigma.Bytes()
sigmaBytes = b[:]
}

return &disperserpb.Attestation{
NonSignerPubkeys: nonSignerPubKeys,
ApkG2: apkG2Bytes[:],
ApkG2: apkG2Bytes,
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
Sigma: sigmaBytes,
QuorumNumbers: quorumNumbers,
QuorumSignedPercentages: quorumResults,
}, nil
Expand Down
25 changes: 25 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2_test
import (
"math/big"
"testing"
"time"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -120,3 +121,27 @@ func TestConvertBlobCertToFromProtobuf(t *testing.T) {

assert.Equal(t, blobCert, newBlobCert)
}

func TestAttestationToProtobuf(t *testing.T) {
zeroAttestation := &v2.Attestation{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: nil,
APKG2: nil,
QuorumAPKs: nil,
Sigma: nil,
QuorumNumbers: nil,
QuorumResults: nil,
}
attestationProto, err := zeroAttestation.ToProtobuf()
assert.NoError(t, err)
assert.Empty(t, attestationProto.NonSignerPubkeys)
assert.Empty(t, attestationProto.ApkG2)
assert.Empty(t, attestationProto.QuorumApks)
assert.Empty(t, attestationProto.Sigma)
assert.Empty(t, attestationProto.QuorumNumbers)
assert.Empty(t, attestationProto.QuorumSignedPercentages)
}
10 changes: 10 additions & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/controller"
Expand Down Expand Up @@ -104,6 +105,7 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to create encoder client: %v", err)
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManagerBlobSet := controller.NewBlobSet()
encodingManager, err := controller.NewEncodingManager(
&config.EncodingManagerConfig,
blobMetadataStore,
Expand All @@ -112,6 +114,7 @@ func RunController(ctx *cli.Context) error {
chainReader,
logger,
metricsRegistry,
encodingManagerBlobSet,
)
if err != nil {
return fmt.Errorf("failed to create encoding manager: %v", err)
Expand Down Expand Up @@ -169,6 +172,11 @@ func RunController(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to create node client manager: %v", err)
}
beforeDispatch := func(blobKey corev2.BlobKey) error {
encodingManagerBlobSet.RemoveBlob(blobKey)
return nil
}
dispatcherBlobSet := controller.NewBlobSet()
dispatcher, err := controller.NewDispatcher(
&config.DispatcherConfig,
blobMetadataStore,
Expand All @@ -178,6 +186,8 @@ func RunController(ctx *cli.Context) error {
nodeClientManager,
logger,
metricsRegistry,
beforeDispatch,
dispatcherBlobSet,
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []
"MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash},
}
}
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, false)
if err != nil {
return nil, err
}
Expand Down
29 changes: 5 additions & 24 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2
return fmt.Errorf("%w: blob already in status %s", common.ErrAlreadyExists, status.String())
}

return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
return fmt.Errorf("%w: invalid status transition from %s to %s", ErrInvalidStateTransition, blob.BlobStatus.String(), status.String())
}

return err
Expand Down Expand Up @@ -594,23 +594,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(

lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
u, ok := lastItem["UpdatedAt"].(*types.AttributeValueMemberN)
if !ok {
return nil, nil, fmt.Errorf("expected *types.AttributeValueMemberN for UpdatedAt, got %T", u)
}
updatedAt, err := strconv.ParseUint(u.Value, 10, 64)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastItem)
if err != nil {
return nil, nil, err
}
return metadata, &StatusIndexCursor{
BlobKey: &bk,
UpdatedAt: updatedAt,
}, nil
return metadata, nil, nil
}

newCursor := StatusIndexCursor{}
Expand Down Expand Up @@ -710,7 +694,7 @@ func (s *BlobMetadataStore) GetBlobCertificates(ctx context.Context, blobKeys []
}
}

items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, true)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -897,11 +881,8 @@ func (s *BlobMetadataStore) PutAttestation(ctx context.Context, attestation *cor
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return common.ErrAlreadyExists
}

// Allow overwrite of existing attestation
err = s.dynamoDBClient.PutItem(ctx, s.tableName, item)
return err
}

Expand Down
49 changes: 41 additions & 8 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,16 +876,20 @@ func TestBlobMetadataStoreGetBlobMetadataByStatusPaginated(t *testing.T) {
require.Equal(t, cursor.UpdatedAt, expectedCursors[i].UpdatedAt)
} else {
require.Len(t, metadata, numBlobs%pageSize)
require.Equal(t, cursor.BlobKey, &keys[numBlobs-1])
require.Equal(t, cursor.UpdatedAt, metadataList[numBlobs-1].UpdatedAt)
require.Nil(t, cursor)
}
i++
}
lastCursor := cursor

for i := 0; i < numBlobs; i++ {
err = blobMetadataStore.UpdateBlobStatus(ctx, keys[i], v2.GatheringSignatures)
require.NoError(t, err)
}

metadata, cursor, err = blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, cursor, int32(pageSize))
require.NoError(t, err)
require.Len(t, metadata, 0)
require.Equal(t, cursor, lastCursor)
require.Nil(t, cursor)

deleteItems(t, dynamoKeys)
}
Expand Down Expand Up @@ -1310,16 +1314,45 @@ func TestBlobMetadataStoreBatchAttestation(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, attestation, fetchedAttestation)

// attempt to put attestation with the same key should fail
err = blobMetadataStore.PutAttestation(ctx, attestation)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

// attempt to retrieve batch header and attestation at the same time
fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, attestation, fetchedAttestation)

// overwrite existing attestation
updatedAttestation := &corev2.Attestation{
BatchHeader: h,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: []*core.G1Point{
core.NewG1Point(big.NewInt(1), big.NewInt(2)),
},
APKG2: apk,
QuorumAPKs: map[uint8]*core.G1Point{
0: core.NewG1Point(big.NewInt(5), big.NewInt(6)),
1: core.NewG1Point(big.NewInt(7), big.NewInt(8)),
},
Sigma: &core.Signature{
G1Point: core.NewG1Point(big.NewInt(9), big.NewInt(10)),
},
QuorumNumbers: []core.QuorumID{0, 1},
QuorumResults: map[uint8]uint8{
0: 100,
1: 90,
},
}

err = blobMetadataStore.PutAttestation(ctx, updatedAttestation)
assert.NoError(t, err)
fetchedAttestation, err = blobMetadataStore.GetAttestation(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, updatedAttestation, fetchedAttestation)

fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, updatedAttestation, fetchedAttestation)

deleteItems(t, []commondynamodb.Key{
{
"PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])},
Expand Down
Loading
Loading