diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index e4472b7ca725..22a4df4a41a2 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,14 +1,10 @@ # Release History -## 1.0.4 (Unreleased) - -### Features Added - -### Breaking Changes +## 1.0.4 (2024-03-05) ### Bugs Fixed -### Other Changes +- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509) ## 1.0.3 (2024-01-16) diff --git a/sdk/messaging/azeventhubs/go.mod b/sdk/messaging/azeventhubs/go.mod index 1378df00d8ba..95dbaeab77de 100644 --- a/sdk/messaging/azeventhubs/go.mod +++ b/sdk/messaging/azeventhubs/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 - github.com/Azure/go-amqp v1.0.4 + github.com/Azure/go-amqp v1.0.5 github.com/golang/mock v1.6.0 github.com/joho/godotenv v1.5.1 github.com/stretchr/testify v1.8.4 diff --git a/sdk/messaging/azeventhubs/go.sum b/sdk/messaging/azeventhubs/go.sum index 6c020e53e999..21cde13d5429 100644 --- a/sdk/messaging/azeventhubs/go.sum +++ b/sdk/messaging/azeventhubs/go.sum @@ -12,8 +12,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0. github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 h1:IfFdxTUDiV58iZqPKgyWiz4X4fCxZeQ1pTQPImLYXpY= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0/go.mod h1:SUZc9YRRHfx2+FAQKNDGrssXehqLpxmwRv2mC/5ntj4= -github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc= -github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock b/sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock index 82ce26e879d8..7a2116fd5bfd 100644 --- a/sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock +++ b/sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock @@ -1,6 +1,6 @@ dependencies: - name: stress-test-addons repository: https://stresstestcharts.blob.core.windows.net/helm/ - version: 0.3.0 -digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e -generated: "2023-09-26T11:39:54.587519919-07:00" + version: 0.3.1 +digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d +generated: "2024-03-01T17:51:06.962215142-08:00" diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index fbccc21daead..d387960a72a4 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,14 +1,11 @@ # Release History -## 1.6.1 (Unreleased) - -### Features Added - -### Breaking Changes +## 1.6.1 (2024-03-05) ### Bugs Fixed -### Other Changes +- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509) +- Fixed a potential memory leak when receiving a message on one receiver and attempting to settle with another. (PR#22431) ## 1.6.0 (2024-01-17) diff --git a/sdk/messaging/azservicebus/go.mod b/sdk/messaging/azservicebus/go.mod index 30fd8a6de2be..6be3efea4361 100644 --- a/sdk/messaging/azservicebus/go.mod +++ b/sdk/messaging/azservicebus/go.mod @@ -8,7 +8,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 - github.com/Azure/go-amqp v1.0.4 + github.com/Azure/go-amqp v1.0.5 ) require ( diff --git a/sdk/messaging/azservicebus/go.sum b/sdk/messaging/azservicebus/go.sum index 6ef5d1b82982..d1cefbf63c22 100644 --- a/sdk/messaging/azservicebus/go.sum +++ b/sdk/messaging/azservicebus/go.sum @@ -7,8 +7,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ= github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc= -github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc= -github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= +github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU= +github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= diff --git a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml index 5bfc5f893d50..b437b5fd68fa 100644 --- a/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml +++ b/sdk/messaging/azservicebus/internal/stress/scenarios-matrix.yaml @@ -50,6 +50,10 @@ matrix: mostlyIdleReceiver: testTarget: mostlyIdleReceiver memory: "0.5Gi" + openCloseMeasurements: + testTarget: openCloseMeasurements + memory: "0.5Gi" + args: -rounds 100 rapidOpenClose: testTarget: rapidOpenClose memory: "0.5Gi" diff --git a/sdk/messaging/azservicebus/internal/stress/shared/stress_context.go b/sdk/messaging/azservicebus/internal/stress/shared/stress_context.go index ce34e9278790..704b13448e98 100644 --- a/sdk/messaging/azservicebus/internal/stress/shared/stress_context.go +++ b/sdk/messaging/azservicebus/internal/stress/shared/stress_context.go @@ -81,6 +81,13 @@ type StressContextOptions struct { // Duration is the amount of time the stress test should run before // the StressContext.Context expires. Duration time.Duration + + // CommonBaggage will be added as part of the telemetry client, and will be included in each + // metric/event/error that's reported. + CommonBaggage map[string]string + + // EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry. + EmitStartEvent bool } func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext { @@ -107,6 +114,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St "TestRunId": testRunID, } + if options != nil && options.CommonBaggage != nil { + for k, v := range options.CommonBaggage { + telemetryClient.Context().CommonProperties[k] = v + } + } + log.Printf("Common properties\n:%#v", telemetryClient.Context().CommonProperties) ctx, cancel := NewCtrlCContext() @@ -144,7 +157,7 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St // return nil // }) - return &StressContext{ + sc := &StressContext{ TestRunID: testRunID, Nano: testRunID, // the same for now ConnectionString: cs, @@ -156,6 +169,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St Context: ctx, cancel: cancel, } + + if options != nil && options.EmitStartEvent { + sc.Start(testName, nil) + } + + return sc } func (sc *StressContext) Start(entityName string, attributes map[string]string) { @@ -173,7 +192,7 @@ func (sc *StressContext) Start(entityName string, attributes map[string]string) } func (sc *StressContext) End() { - log.Printf("Stopping and flushing telemetry") + log.Printf("Stopping and flushing telemetry: %#v", sc.TC.Context().CommonProperties) sc.cancel() diff --git a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml index 978e70871b3c..39c40ba5bf2a 100644 --- a/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml +++ b/sdk/messaging/azservicebus/internal/stress/templates/stress-test-job.yaml @@ -18,7 +18,7 @@ spec: set -ex; mkdir -p "$DEBUG_SHARE"; {{ if ne .Stress.benchmark true }} - /app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; + /app/stress tests "{{ .Stress.testTarget }}" {{ default "" .Stress.args }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; {{ else }} /app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log"; {{ end }} diff --git a/sdk/messaging/azservicebus/internal/stress/tests/open_close_measurements.go b/sdk/messaging/azservicebus/internal/stress/tests/open_close_measurements.go new file mode 100644 index 000000000000..079f93a30df3 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/stress/tests/open_close_measurements.go @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tests + +import ( + "context" + "flag" + "fmt" + "log" + "strconv" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared" +) + +// OpenCloseMeasurements tests that we are able to consistently open and close our links and connections +// in a timely way. This test doesn't immediately fail, it's primary purpose is just to provide historical +// and measurable data on our performance. +// +// The origin of this test was a bug we found in go-amqp where, if the frame was too small, it wouldn't parse +// and return it until we received more data (any data, just so long as it caused our total local buffer to exceed +// 8 bytes) [PR#320] +// +// PR#320: https://github.com/Azure/go-amqp/pull/320 +func OpenCloseMeasurements(remainingArgs []string) { + type testArgs struct { + SleepDuration time.Duration + MessageCount int + BodySize int + } + + fs := flag.NewFlagSet("args", flag.PanicOnError) + numRounds := fs.Int("rounds", 10, "The number of rounds of sends and closes to run") + _ = fs.Parse(remainingArgs) + + fn := func(args testArgs) { + sc := shared.MustCreateStressContext("OpenCloseMeasurements", &shared.StressContextOptions{ + CommonBaggage: map[string]string{ + "SleepDuration": args.SleepDuration.String(), + "MessageCount": strconv.FormatInt(int64(args.MessageCount), 10), + "BodySize": strconv.FormatInt(int64(args.BodySize), 10), + }, + EmitStartEvent: true, + }) + + defer sc.End() + + queueName := fmt.Sprintf("OpenCloseMeasurements-%s", sc.Nano) + _ = shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{}) + + client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil) + sc.PanicOnError("failed to create client", err) + + trackingSender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil) + sc.PanicOnError("failed to create sender", err) + + log.Printf("Sending message to warm up connection and links.") + + body := make([]byte, args.BodySize) + + for i := 0; i < args.MessageCount; i++ { + err = trackingSender.SendMessage(context.Background(), &azservicebus.Message{ + Body: body, + }, nil) + sc.NoErrorf(err, "failed to send message %d", i) + } + + log.Printf("Sleeping for %s, done at %s...", args.SleepDuration, time.Now().Add(args.SleepDuration)) + time.Sleep(args.SleepDuration) + + log.Printf("Done sleeping, now attempting to close link") + // the error is reported for now to metrics - not going to kill this as we have a bug where + // the "detach because idle" error comes back from Close() right now. + + start := time.Now() + max := 10 * time.Second + _ = trackingSender.Close(context.Background()) + + if time.Since(start) > max { + sc.PanicOnError("Slow close", fmt.Errorf("Took longer than %s", max)) + } + } + + // some simple cases + testCases := []testArgs{ + {1 * time.Minute, 1, 10}, + {5 * time.Minute, 100, 100}, + {5 * time.Minute, 100, 10000}, + {5 * time.Minute, 1, 10}, + {10*time.Minute + 30*time.Second, 1, 10}, + {11 * time.Minute, 1, 10}, + } + + for i := 0; i < *numRounds; i++ { + wg := sync.WaitGroup{} + + for _, args := range testCases { + wg.Add(1) + + go func(args testArgs) { + defer wg.Done() + fn(args) + }(args) + } + + wg.Wait() + } +} diff --git a/sdk/messaging/azservicebus/internal/stress/tests/tests.go b/sdk/messaging/azservicebus/internal/stress/tests/tests.go index bf85e1923616..905c6750979f 100644 --- a/sdk/messaging/azservicebus/internal/stress/tests/tests.go +++ b/sdk/messaging/azservicebus/internal/stress/tests/tests.go @@ -42,6 +42,7 @@ func Run(remainingArgs []string) { "infiniteSendAndReceive": InfiniteSendAndReceiveRun, "longRunningRenewLock": LongRunningRenewLockTest, "mostlyIdleReceiver": MostlyIdleReceiver, + "openCloseMeasurements": OpenCloseMeasurements, "rapidOpenClose": RapidOpenCloseTest, "receiveCancellation": ReceiveCancellation, "sendAndReceiveDrain": SendAndReceiveDrain,