Skip to content

Commit

Permalink
Reset cursors and deduplicate blobs in controller (#1261)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Feb 13, 2025
1 parent 8f02ba6 commit 5ae24bc
Show file tree
Hide file tree
Showing 25 changed files with 529 additions and 97 deletions.
4 changes: 1 addition & 3 deletions api/docs/disperser_v2.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/disperser_v2.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions api/docs/eigenda-protos.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/grpc/disperser/v2/disperser_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions api/proto/disperser/v2/disperser_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ enum BlobStatus {
// and in doing so requesting that the validators sign to acknowledge receipt of the blob.
// Requests that timeout or receive errors are resubmitted to DA nodes for some period of time set by the disperser,
// after which the BlobStatus becomes COMPLETE.
//
// Note: this status is not currently implemented, and is a placeholder for future functionality.
GATHERING_SIGNATURES = 3;

// COMPLETE means the blob has been dispersed to DA nodes, and the GATHERING_SIGNATURES period of time has completed.
Expand Down
16 changes: 11 additions & 5 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Client interface {
UpdateItemWithCondition(ctx context.Context, tableName string, key Key, item Item, condition expression.ConditionBuilder) (Item, error)
IncrementBy(ctx context.Context, tableName string, key Key, attr string, value uint64) (Item, error)
GetItem(ctx context.Context, tableName string, key Key) (Item, error)
GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error)
GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error)
QueryIndex(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
Query(ctx context.Context, tableName string, keyCondition string, expAttributeValues ExpressionValues) ([]Item, error)
QueryWithInput(ctx context.Context, input *dynamodb.QueryInput) ([]Item, error)
Expand Down Expand Up @@ -276,8 +276,8 @@ func (c *client) GetItem(ctx context.Context, tableName string, key Key) (Item,

// GetItems returns the items for the given keys
// Note: ordering of items is not guaranteed
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys)
func (c *client) GetItems(ctx context.Context, tableName string, keys []Key, consistentRead bool) ([]Item, error) {
items, err := c.readItems(ctx, tableName, keys, consistentRead)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -444,7 +444,12 @@ func (c *client) writeItems(ctx context.Context, tableName string, requestItems
return failedItems, nil
}

func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([]Item, error) {
func (c *client) readItems(
ctx context.Context,
tableName string,
keys []Key,
consistentRead bool,
) ([]Item, error) {
startIndex := 0
items := make([]Item, 0)
for startIndex < len(keys) {
Expand All @@ -454,7 +459,8 @@ func (c *client) readItems(ctx context.Context, tableName string, keys []Key) ([
output, err := c.dynamoClient.BatchGetItem(ctx, &dynamodb.BatchGetItemInput{
RequestItems: map[string]types.KeysAndAttributes{
tableName: {
Keys: keysBatch,
Keys: keysBatch,
ConsistentRead: aws.Bool(consistentRead),
},
},
})
Expand Down
2 changes: 1 addition & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func TestBatchOperations(t *testing.T) {
}
}

fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys)
fetchedItems, err := dynamoClient.GetItems(ctx, tableName, keys, true)
assert.NoError(t, err)
assert.Len(t, fetchedItems, numItems)
blobKeys := make([]string, numItems)
Expand Down
2 changes: 1 addition & 1 deletion common/aws/mock/dynamodb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *MockDynamoDBClient) GetItem(ctx context.Context, tableName string, key
return args.Get(0).(dynamodb.Item), args.Error(1)
}

func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key) ([]dynamodb.Item, error) {
func (c *MockDynamoDBClient) GetItems(ctx context.Context, tableName string, keys []dynamodb.Key, consistentRead bool) ([]dynamodb.Item, error) {
args := c.Called()
return args.Get(0).([]dynamodb.Item), args.Error(1)
}
Expand Down
16 changes: 12 additions & 4 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,22 @@ func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
quorumResults[i] = a.QuorumResults[q]
}

apkG2Bytes := a.APKG2.Bytes()
sigmaBytes := a.Sigma.Bytes()
var apkG2Bytes []byte
var sigmaBytes []byte
if a.APKG2 != nil {
b := a.APKG2.Bytes()
apkG2Bytes = b[:]
}
if a.Sigma != nil {
b := a.Sigma.Bytes()
sigmaBytes = b[:]
}

return &disperserpb.Attestation{
NonSignerPubkeys: nonSignerPubKeys,
ApkG2: apkG2Bytes[:],
ApkG2: apkG2Bytes,
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
Sigma: sigmaBytes,
QuorumNumbers: quorumNumbers,
QuorumSignedPercentages: quorumResults,
}, nil
Expand Down
25 changes: 25 additions & 0 deletions core/v2/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v2_test
import (
"math/big"
"testing"
"time"

"github.com/Layr-Labs/eigenda/core"
v2 "github.com/Layr-Labs/eigenda/core/v2"
Expand Down Expand Up @@ -120,3 +121,27 @@ func TestConvertBlobCertToFromProtobuf(t *testing.T) {

assert.Equal(t, blobCert, newBlobCert)
}

func TestAttestationToProtobuf(t *testing.T) {
zeroAttestation := &v2.Attestation{
BatchHeader: &v2.BatchHeader{
BatchRoot: [32]byte{1, 1, 1},
ReferenceBlockNumber: 100,
},
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: nil,
APKG2: nil,
QuorumAPKs: nil,
Sigma: nil,
QuorumNumbers: nil,
QuorumResults: nil,
}
attestationProto, err := zeroAttestation.ToProtobuf()
assert.NoError(t, err)
assert.Empty(t, attestationProto.NonSignerPubkeys)
assert.Empty(t, attestationProto.ApkG2)
assert.Empty(t, attestationProto.QuorumApks)
assert.Empty(t, attestationProto.Sigma)
assert.Empty(t, attestationProto.QuorumNumbers)
assert.Empty(t, attestationProto.QuorumSignedPercentages)
}
10 changes: 10 additions & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/indexer"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/disperser/cmd/controller/flags"
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/disperser/controller"
Expand Down Expand Up @@ -104,6 +105,7 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to create encoder client: %v", err)
}
encodingPool := workerpool.New(config.NumConcurrentEncodingRequests)
encodingManagerBlobSet := controller.NewBlobSet()
encodingManager, err := controller.NewEncodingManager(
&config.EncodingManagerConfig,
blobMetadataStore,
Expand All @@ -112,6 +114,7 @@ func RunController(ctx *cli.Context) error {
chainReader,
logger,
metricsRegistry,
encodingManagerBlobSet,
)
if err != nil {
return fmt.Errorf("failed to create encoding manager: %v", err)
Expand Down Expand Up @@ -169,6 +172,11 @@ func RunController(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to create node client manager: %v", err)
}
beforeDispatch := func(blobKey corev2.BlobKey) error {
encodingManagerBlobSet.RemoveBlob(blobKey)
return nil
}
dispatcherBlobSet := controller.NewBlobSet()
dispatcher, err := controller.NewDispatcher(
&config.DispatcherConfig,
blobMetadataStore,
Expand All @@ -178,6 +186,8 @@ func RunController(ctx *cli.Context) error {
nodeClientManager,
logger,
metricsRegistry,
beforeDispatch,
dispatcherBlobSet,
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *BlobMetadataStore) GetBulkBlobMetadata(ctx context.Context, blobKeys []
"MetadataHash": &types.AttributeValueMemberS{Value: blobKeys[i].MetadataHash},
}
}
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, false)
if err != nil {
return nil, err
}
Expand Down
29 changes: 5 additions & 24 deletions disperser/common/v2/blobstore/dynamo_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *BlobMetadataStore) UpdateBlobStatus(ctx context.Context, blobKey corev2
return fmt.Errorf("%w: blob already in status %s", common.ErrAlreadyExists, status.String())
}

return fmt.Errorf("%w: invalid status transition to %s", ErrInvalidStateTransition, status.String())
return fmt.Errorf("%w: invalid status transition from %s to %s", ErrInvalidStateTransition, blob.BlobStatus.String(), status.String())
}

return err
Expand Down Expand Up @@ -594,23 +594,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusPaginated(

lastEvaludatedKey := res.LastEvaluatedKey
if lastEvaludatedKey == nil {
lastItem := res.Items[len(res.Items)-1]
u, ok := lastItem["UpdatedAt"].(*types.AttributeValueMemberN)
if !ok {
return nil, nil, fmt.Errorf("expected *types.AttributeValueMemberN for UpdatedAt, got %T", u)
}
updatedAt, err := strconv.ParseUint(u.Value, 10, 64)
if err != nil {
return nil, nil, err
}
bk, err := UnmarshalBlobKey(lastItem)
if err != nil {
return nil, nil, err
}
return metadata, &StatusIndexCursor{
BlobKey: &bk,
UpdatedAt: updatedAt,
}, nil
return metadata, nil, nil
}

newCursor := StatusIndexCursor{}
Expand Down Expand Up @@ -710,7 +694,7 @@ func (s *BlobMetadataStore) GetBlobCertificates(ctx context.Context, blobKeys []
}
}

items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys)
items, err := s.dynamoDBClient.GetItems(ctx, s.tableName, keys, true)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -897,11 +881,8 @@ func (s *BlobMetadataStore) PutAttestation(ctx context.Context, attestation *cor
return err
}

err = s.dynamoDBClient.PutItemWithCondition(ctx, s.tableName, item, "attribute_not_exists(PK) AND attribute_not_exists(SK)", nil, nil)
if errors.Is(err, commondynamodb.ErrConditionFailed) {
return common.ErrAlreadyExists
}

// Allow overwrite of existing attestation
err = s.dynamoDBClient.PutItem(ctx, s.tableName, item)
return err
}

Expand Down
49 changes: 41 additions & 8 deletions disperser/common/v2/blobstore/dynamo_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,16 +876,20 @@ func TestBlobMetadataStoreGetBlobMetadataByStatusPaginated(t *testing.T) {
require.Equal(t, cursor.UpdatedAt, expectedCursors[i].UpdatedAt)
} else {
require.Len(t, metadata, numBlobs%pageSize)
require.Equal(t, cursor.BlobKey, &keys[numBlobs-1])
require.Equal(t, cursor.UpdatedAt, metadataList[numBlobs-1].UpdatedAt)
require.Nil(t, cursor)
}
i++
}
lastCursor := cursor

for i := 0; i < numBlobs; i++ {
err = blobMetadataStore.UpdateBlobStatus(ctx, keys[i], v2.GatheringSignatures)
require.NoError(t, err)
}

metadata, cursor, err = blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, cursor, int32(pageSize))
require.NoError(t, err)
require.Len(t, metadata, 0)
require.Equal(t, cursor, lastCursor)
require.Nil(t, cursor)

deleteItems(t, dynamoKeys)
}
Expand Down Expand Up @@ -1310,16 +1314,45 @@ func TestBlobMetadataStoreBatchAttestation(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, attestation, fetchedAttestation)

// attempt to put attestation with the same key should fail
err = blobMetadataStore.PutAttestation(ctx, attestation)
assert.ErrorIs(t, err, common.ErrAlreadyExists)

// attempt to retrieve batch header and attestation at the same time
fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, attestation, fetchedAttestation)

// overwrite existing attestation
updatedAttestation := &corev2.Attestation{
BatchHeader: h,
AttestedAt: uint64(time.Now().UnixNano()),
NonSignerPubKeys: []*core.G1Point{
core.NewG1Point(big.NewInt(1), big.NewInt(2)),
},
APKG2: apk,
QuorumAPKs: map[uint8]*core.G1Point{
0: core.NewG1Point(big.NewInt(5), big.NewInt(6)),
1: core.NewG1Point(big.NewInt(7), big.NewInt(8)),
},
Sigma: &core.Signature{
G1Point: core.NewG1Point(big.NewInt(9), big.NewInt(10)),
},
QuorumNumbers: []core.QuorumID{0, 1},
QuorumResults: map[uint8]uint8{
0: 100,
1: 90,
},
}

err = blobMetadataStore.PutAttestation(ctx, updatedAttestation)
assert.NoError(t, err)
fetchedAttestation, err = blobMetadataStore.GetAttestation(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, updatedAttestation, fetchedAttestation)

fetchedHeader, fetchedAttestation, err = blobMetadataStore.GetSignedBatch(ctx, bhh)
assert.NoError(t, err)
assert.Equal(t, h, fetchedHeader)
assert.Equal(t, updatedAttestation, fetchedAttestation)

deleteItems(t, []commondynamodb.Key{
{
"PK": &types.AttributeValueMemberS{Value: "BatchHeader#" + hex.EncodeToString(bhh[:])},
Expand Down
Loading

0 comments on commit 5ae24bc

Please sign in to comment.