Skip to content

Commit

Permalink
reset cursors and deduplicate blobs in controller
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Feb 12, 2025
1 parent c5851d6 commit 474165c
Show file tree
Hide file tree
Showing 24 changed files with 520 additions and 93 deletions.
4 changes: 1 addition & 3 deletions api/docs/disperser_v2.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/disperser_v2.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ enum BlobStatus {
// and in doing so requesting that the validators sign to acknowledge receipt of the blob.
// Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
// after which the BlobStatus becomes COMPLETE.
//
// Note: this status is not currently implemented, and is a placeholder for future functionality.
GATHERING_SIGNATURES = 3;

// COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
Expand Down
16 changes: 11 additions & 5 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error)
GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error)
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error)
Expand Down Expand Up @@ -276,8 +276,8 @@ func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item,

// GetItems returns the items for the given keys
// Note: ordering of items is not guaranteed
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys)
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys, consistentRead)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -444,7 +444,12 @@ func (c *client) writeItems(ctx context.Context, tableName string, requestItems
return failedItems, nil
}

func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
func (c *client) readItems(
ctx context.Context,
tableName string,
keys []Key,
consistentRead bool,
) ([]Item, error) {
startIndex := 0
items := make([]Item, 0)
for startIndex < len(keys) {
Expand All @@ -454,7 +459,8 @@ func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([
output, err := c.dynamoClient.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
RequestItems: map[string]types.KeysAndAttributes{
tableName: {
Keys: keysBatch,
Keys: keysBatch,
ConsistentRead: aws.Bool(consistentRead),
},
},
})
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestBatchOperations(t *testing.T) {
}
}

fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys)
fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys, true)
assert.NoError(t, err)
assert.Len(t, fetchedItems, numItems)
blobKeys := make([]string, numItems)
Expand Down
2 changes: 1 addition & 1 deletion common/aws/mock/dynamodb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Item, error) {
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key, consistentRead bool) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}
Expand Down
16 changes: 12 additions & 4 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,22 @@ func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
quorumResults[i] = a.QuorumResults[q]
}

apkG2Bytes := a.APKG2.Bytes()
sigmaBytes := a.Sigma.Bytes()
var apkG2Bytes []byte
var sigmaBytes []byte
if a.APKG2 != nil {
b := a.APKG2.Bytes()
apkG2Bytes = b[:]
}
if a.Sigma != nil {
b := a.Sigma.Bytes()
sigmaBytes = b[:]
}

return &disperserpb.Attestation{
NonSignerPubkeys: nonSignerPubKeys,
ApkG2: apkG2Bytes[:],
ApkG2: apkG2Bytes,
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
Sigma: sigmaBytes,
QuorumNumbers: quorumNumbers,
QuorumSignedPercentages: quorumResults,
}, nil
Expand Down
25 changes: 25 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2_test
import (
"math/big"
"testing"
"time"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -120,3 +121,27 @@ func TestConvertBlobCertToFromProtobuf(t *testing.T) {

assert.Equal(t, blobCert, newBlobCert)
}

func TestAttestationToProtobuf(t *testing.T) {
zeroAttestation := &v2.Attestation{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: nil,
APKG2: nil,
QuorumAPKs: nil,
Sigma: nil,
QuorumNumbers: nil,
QuorumResults: nil,
}
attestationProto, err := zeroAttestation.ToProtobuf()
assert.NoError(t, err)
assert.Empty(t, attestationProto.NonSignerPubkeys)
assert.Empty(t, attestationProto.ApkG2)
assert.Empty(t, attestationProto.QuorumApks)
assert.Empty(t, attestationProto.Sigma)
assert.Empty(t, attestationProto.QuorumNumbers)
assert.Empty(t, attestationProto.QuorumSignedPercentages)
}
10 changes: 10 additions & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/controller"
Expand Down Expand Up @@ -104,6 +105,7 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to create encoder client: %v", err)
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManagerBlobQueue := controller.NewBlobQueue()
encodingManager, err := controller.NewEncodingManager(
&config.EncodingManagerConfig,
blobMetadataStore,
Expand All @@ -112,6 +114,7 @@ func RunController(ctx *cli.Context) error {
chainReader,
logger,
metricsRegistry,
encodingManagerBlobQueue,
)
if err != nil {
return fmt.Errorf("failed to create encoding manager: %v", err)
Expand Down Expand Up @@ -169,6 +172,11 @@ func RunController(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to create node client manager: %v", err)
}
beforeDispatch := func(blobKey corev2.BlobKey) error {
encodingManagerBlobQueue.RemoveBlob(blobKey)
return nil
}
dispatcherBlobQueue := controller.NewBlobQueue()
dispatcher, err := controller.NewDispatcher(
&config.DispatcherConfig,
blobMetadataStore,
Expand All @@ -178,6 +186,8 @@ func RunController(ctx *cli.Context) error {
nodeClientManager,
logger,
metricsRegistry,
beforeDispatch,
dispatcherBlobQueue,
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []
"MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash},
}
}
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, false)
if err != nil {
return nil, err
}
Expand Down
27 changes: 4 additions & 23 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,23 +594,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(

lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
u, ok := lastItem["UpdatedAt"].(*types.AttributeValueMemberN)
if !ok {
return nil, nil, fmt.Errorf("expected *types.AttributeValueMemberN for UpdatedAt, got %T", u)
}
updatedAt, err := strconv.ParseUint(u.Value, 10, 64)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastItem)
if err != nil {
return nil, nil, err
}
return metadata, &StatusIndexCursor{
BlobKey: &bk,
UpdatedAt: updatedAt,
}, nil
return metadata, nil, nil
}

newCursor := StatusIndexCursor{}
Expand Down Expand Up @@ -710,7 +694,7 @@ func (s *BlobMetadataStore) GetBlobCertificates(ctx context.Context, blobKeys []
}
}

items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, true)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -897,11 +881,8 @@ func (s *BlobMetadataStore) PutAttestation(ctx context.Context, attestation *cor
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return common.ErrAlreadyExists
}

// Allow overwrite of existing attestation
err = s.dynamoDBClient.PutItem(ctx, s.tableName, item)
return err
}

Expand Down
49 changes: 41 additions & 8 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,16 +876,20 @@ func TestBlobMetadataStoreGetBlobMetadataByStatusPaginated(t *testing.T) {
require.Equal(t, cursor.UpdatedAt, expectedCursors[i].UpdatedAt)
} else {
require.Len(t, metadata, numBlobs%pageSize)
require.Equal(t, cursor.BlobKey, &keys[numBlobs-1])
require.Equal(t, cursor.UpdatedAt, metadataList[numBlobs-1].UpdatedAt)
require.Nil(t, cursor)
}
i++
}
lastCursor := cursor

for i := 0; i < numBlobs; i++ {
err = blobMetadataStore.UpdateBlobStatus(ctx, keys[i], v2.GatheringSignatures)
require.NoError(t, err)
}

metadata, cursor, err = blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, cursor, int32(pageSize))
require.NoError(t, err)
require.Len(t, metadata, 0)
require.Equal(t, cursor, lastCursor)
require.Nil(t, cursor)

deleteItems(t, dynamoKeys)
}
Expand Down Expand Up @@ -1310,16 +1314,45 @@ func TestBlobMetadataStoreBatchAttestation(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, attestation, fetchedAttestation)

// attempt to put attestation with the same key should fail
err = blobMetadataStore.PutAttestation(ctx, attestation)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

// attempt to retrieve batch header and attestation at the same time
fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, attestation, fetchedAttestation)

// overwrite existing attestation
updatedAttestation := &corev2.Attestation{
BatchHeader: h,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: []*core.G1Point{
core.NewG1Point(big.NewInt(1), big.NewInt(2)),
},
APKG2: apk,
QuorumAPKs: map[uint8]*core.G1Point{
0: core.NewG1Point(big.NewInt(5), big.NewInt(6)),
1: core.NewG1Point(big.NewInt(7), big.NewInt(8)),
},
Sigma: &core.Signature{
G1Point: core.NewG1Point(big.NewInt(9), big.NewInt(10)),
},
QuorumNumbers: []core.QuorumID{0, 1},
QuorumResults: map[uint8]uint8{
0: 100,
1: 90,
},
}

err = blobMetadataStore.PutAttestation(ctx, updatedAttestation)
assert.NoError(t, err)
fetchedAttestation, err = blobMetadataStore.GetAttestation(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, updatedAttestation, fetchedAttestation)

fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, updatedAttestation, fetchedAttestation)

deleteItems(t, []commondynamodb.Key{
{
"PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])},
Expand Down
Loading

0 comments on commit 474165c

Please sign in to comment.