diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index fede8f8742ce..7cfbfb33ee54 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -14,6 +14,7 @@ - Potential leaks for $cbs and $management when there was a partial failure. (PR#20564) - Latest go-amqp changes have been merged in with fixes for robustness. - Sending a message to an entity that is full will no longer retry. (PR#20722) +- Checkpoint store handles multiple initial owners properly, allowing only one through. (PR#20727) ## 0.6.0 (2023-03-07) diff --git a/sdk/messaging/azeventhubs/checkpoints/blob_store.go b/sdk/messaging/azeventhubs/checkpoints/blob_store.go index 453265e77c60..e8a134643603 100644 --- a/sdk/messaging/azeventhubs/checkpoints/blob_store.go +++ b/sdk/messaging/azeventhubs/checkpoints/blob_store.go @@ -44,6 +44,9 @@ func NewBlobStore(containerClient *container.Client, options *BlobStoreOptions) // ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns // the actual partitions that were claimed. +// +// If we fail to claim ownership because of another update then it will be omitted from the +// returned slice of [Ownership]'s. It is not considered an error. func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []azeventhubs.Ownership, options *azeventhubs.ClaimOwnershipOptions) ([]azeventhubs.Ownership, error) { var ownerships []azeventhubs.Ownership @@ -54,13 +57,12 @@ func (b *BlobStore) ClaimOwnership(ctx context.Context, partitionOwnership []aze if err != nil { return nil, err } - lastModified, etag, err := b.setMetadata(ctx, blobName, newOwnershipBlobMetadata(po), po.ETag) + lastModified, etag, err := b.setOwnershipMetadata(ctx, blobName, po) if err != nil { - if bloberror.HasCode(err, bloberror.ConditionNotMet) { - // we can fail to claim ownership and that's okay - it's expected that clients will - // attempt to claim with whatever state they hold locally. If they fail it just means - // someone else claimed ownership before them. + if bloberror.HasCode(err, + bloberror.ConditionNotMet, // updated before we could update it + bloberror.BlobAlreadyExists) { // created before we could create it continue } @@ -179,6 +181,8 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s } // UpdateCheckpoint updates a specific checkpoint with a sequence and offset. +// +// NOTE: This function doesn't attempt to prevent simultaneous checkpoint updates - ownership is assumed. func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs.Checkpoint, options *azeventhubs.UpdateCheckpointOptions) error { blobName, err := nameForCheckpointBlob(checkpoint) @@ -186,18 +190,19 @@ func (b *BlobStore) UpdateCheckpoint(ctx context.Context, checkpoint azeventhubs return err } - _, _, err = b.setMetadata(ctx, blobName, newCheckpointBlobMetadata(checkpoint), nil) + _, _, err = b.setCheckpointMetadata(ctx, blobName, checkpoint) return err } -func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetadata map[string]*string, etag *azcore.ETag) (*time.Time, azcore.ETag, error) { +func (b *BlobStore) setOwnershipMetadata(ctx context.Context, blobName string, ownership azeventhubs.Ownership) (*time.Time, azcore.ETag, error) { + blobMetadata := newOwnershipBlobMetadata(ownership) blobClient := b.cc.NewBlockBlobClient(blobName) - if etag != nil { + if ownership.ETag != nil { setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, &blob.SetMetadataOptions{ AccessConditions: &blob.AccessConditions{ ModifiedAccessConditions: &blob.ModifiedAccessConditions{ - IfMatch: etag, + IfMatch: ownership.ETag, }, }, }) @@ -207,29 +212,52 @@ func (b *BlobStore) setMetadata(ctx context.Context, blobName string, blobMetada } return setMetadataResp.LastModified, *setMetadataResp.ETag, nil - } else { - setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil) + } - if err == nil { - return setMetadataResp.LastModified, *setMetadataResp.ETag, nil - } + uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{ + Metadata: blobMetadata, + AccessConditions: &blob.AccessConditions{ + ModifiedAccessConditions: &blob.ModifiedAccessConditions{ + IfNoneMatch: to.Ptr(azcore.ETag("*")), + }, + }, + }) - if !bloberror.HasCode(err, bloberror.BlobNotFound) { - return nil, "", err - } + if err != nil { + return nil, "", err + } - // in JS they check to see if the error is BlobNotFound. If it is, then they - // do a full upload of a blob instead. - uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{ - Metadata: blobMetadata, - }) + return uploadResp.LastModified, *uploadResp.ETag, nil +} - if err != nil { - return nil, "", err - } +// setCheckpointMetadata sets the metadata for a checkpoint, falling back to creating +// the blob if it doesn't already exist. +// +// NOTE: unlike [setOwnershipMetadata] this function doesn't attempt to prevent simultaneous +// checkpoint updates - ownership is assumed. +func (b *BlobStore) setCheckpointMetadata(ctx context.Context, blobName string, checkpoint azeventhubs.Checkpoint) (*time.Time, azcore.ETag, error) { + blobMetadata := newCheckpointBlobMetadata(checkpoint) + blobClient := b.cc.NewBlockBlobClient(blobName) + + setMetadataResp, err := blobClient.SetMetadata(ctx, blobMetadata, nil) + + if err == nil { + return setMetadataResp.LastModified, *setMetadataResp.ETag, nil + } - return uploadResp.LastModified, *uploadResp.ETag, nil + if !bloberror.HasCode(err, bloberror.BlobNotFound) { + return nil, "", err } + + uploadResp, err := blobClient.Upload(ctx, streaming.NopCloser(bytes.NewReader([]byte{})), &blockblob.UploadOptions{ + Metadata: blobMetadata, + }) + + if err != nil { + return nil, "", err + } + + return uploadResp.LastModified, *uploadResp.ETag, nil } func nameForCheckpointBlob(a azeventhubs.Checkpoint) (string, error) { diff --git a/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go b/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go index 0537e60ac5e6..4605fdcb33ba 100644 --- a/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go +++ b/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go @@ -4,6 +4,7 @@ package checkpoints_test import ( "context" + "fmt" "os" "strconv" "testing" @@ -216,6 +217,126 @@ func TestBlobStore_ListAndClaim(t *testing.T) { require.Empty(t, claimedOwnerships) } +func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) { + testData := getContainerClient(t) + defer testData.Cleanup() + + cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) + require.NoError(t, err) + + store, err := checkpoints.NewBlobStore(cc, nil) + require.NoError(t, err) + + // we're going to make multiple calls to the blob store but only _one_ should succeed + // since it's "first one in wins" + claimsCh := make(chan []azeventhubs.Ownership, 20) + + t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh)) + + // attempt to claim the same partition from multiple goroutines. Only _one_ of the + // goroutines should walk away thinking it claimed the partition. + for i := 0; i < cap(claimsCh); i++ { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{ + {ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"}, + }, nil) + + if err != nil { + claimsCh <- nil + require.NoError(t, err) + } else { + claimsCh <- ownerships + } + }() + } + + claimed := map[string]bool{} + numFailedClaims := 0 + + for i := 0; i < cap(claimsCh); i++ { + claims := <-claimsCh + + if claims == nil { + numFailedClaims++ + continue + } + + for _, claim := range claims { + require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID)) + require.NotNil(t, claim.ETag) + claimed[claim.PartitionID] = true + } + } + + require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh))) +} + +func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) { + testData := getContainerClient(t) + defer testData.Cleanup() + + cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) + require.NoError(t, err) + + store, err := checkpoints.NewBlobStore(cc, nil) + require.NoError(t, err) + + // we're going to make multiple calls to the blob store but only _one_ should succeed + // since it's "first one in wins" + claimsCh := make(chan []azeventhubs.Ownership, 20) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{ + {ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"}, + }, nil) + require.NoError(t, err) + require.Equal(t, "0", ownerships[0].PartitionID) + require.NotNil(t, ownerships[0].ETag) + + t.Logf("Starting %d goroutines to claim ownership without an etag", cap(claimsCh)) + + // attempt to claim the same partition from multiple goroutines. Only _one_ of the + // goroutines should walk away thinking it claimed the partition. + for i := 0; i < cap(claimsCh); i++ { + go func() { + + ownerships, err := store.ClaimOwnership(ctx, ownerships, nil) + + if err != nil { + claimsCh <- nil + require.NoError(t, err) + } else { + claimsCh <- ownerships + } + }() + } + + claimed := map[string]bool{} + numFailedClaims := 0 + + for i := 0; i < cap(claimsCh); i++ { + claims := <-claimsCh + + if claims == nil { + numFailedClaims++ + continue + } + + for _, claim := range claims { + require.False(t, claimed[claim.PartitionID], fmt.Sprintf("Partition ID %s was claimed more than once", claim.PartitionID)) + require.NotNil(t, claim.ETag) + claimed[claim.PartitionID] = true + } + } + + require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh))) +} + func getContainerClient(t *testing.T) struct { ConnectionString string ContainerName string diff --git a/sdk/messaging/azeventhubs/internal/test/test_helpers.go b/sdk/messaging/azeventhubs/internal/test/test_helpers.go index bfe1aa3c1bb0..97f7e5cfad83 100644 --- a/sdk/messaging/azeventhubs/internal/test/test_helpers.go +++ b/sdk/messaging/azeventhubs/internal/test/test_helpers.go @@ -40,6 +40,13 @@ func CaptureLogsForTest() func() []string { func CaptureLogsForTestWithChannel(messagesCh chan string) func() []string { setAzLogListener(func(e azlog.Event, s string) { + defer func() { + if err := recover(); err != nil { + fmt.Printf("FAILED SENDING MESSAGE (%s), message was: [%s] %s\n", err, e, s) + panic(err) + } + }() + messagesCh <- fmt.Sprintf("[%s] %s", e, s) })