Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus, azeventhubs] Updating to take in the new AMQP version. #22509

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.0.4 (Unreleased)

### Features Added

### Breaking Changes
## 1.0.4 (2024-03-05)

### Bugs Fixed

### Other Changes
- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509)

## 1.0.3 (2024-01-16)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0
github.com/Azure/go-amqp v1.0.4
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.5.1
github.com/stretchr/testify v1.8.4
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azeventhubs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 h1:IfFdxTUDiV58iZqPK
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0/go.mod h1:SUZc9YRRHfx2+FAQKNDGrssXehqLpxmwRv2mC/5ntj4=
github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc=
github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f h1:9yUxNoINCNZaa2wmW6Kl8gLkUiyLmxby6ZjCCeRYCB0=
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.3.0
digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e
generated: "2023-09-26T11:39:54.587519919-07:00"
version: 0.3.1
digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d
generated: "2024-03-01T17:51:06.962215142-08:00"
9 changes: 3 additions & 6 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Release History

## 1.6.1 (Unreleased)

### Features Added

### Breaking Changes
## 1.6.1 (2024-03-05)

### Bugs Fixed

### Other Changes
- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509)
- Fixed a potential memory leak when receiving a message on one receiver and attempting to settle with another. (PR#22431)

## 1.6.0 (2024-01-17)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2
github.com/Azure/go-amqp v1.0.4
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f
)

require (
Expand Down
2 changes: 2 additions & 0 deletions sdk/messaging/azservicebus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aM
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc=
github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc=
github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f h1:9yUxNoINCNZaa2wmW6Kl8gLkUiyLmxby6ZjCCeRYCB0=
github.com/Azure/go-amqp v1.0.5-0.20240301200753-2dff4b36f85f/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ matrix:
mostlyIdleReceiver:
testTarget: mostlyIdleReceiver
memory: "0.5Gi"
openCloseMeasurements:
testTarget: openCloseMeasurements
memory: "0.5Gi"
args: -rounds 100
rapidOpenClose:
testTarget: rapidOpenClose
memory: "0.5Gi"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ type StressContextOptions struct {
// Duration is the amount of time the stress test should run before
// the StressContext.Context expires.
Duration time.Duration

// CommonBaggage will be added as part of the telemetry client, and will be included in each
// metric/event/error that's reported.
CommonBaggage map[string]string

// EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry.
EmitStartEvent bool
}

func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext {
Expand All @@ -107,6 +114,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
"TestRunId": testRunID,
}

if options != nil && options.CommonBaggage != nil {
for k, v := range options.CommonBaggage {
telemetryClient.Context().CommonProperties[k] = v
}
}

log.Printf("Common properties\n:%#v", telemetryClient.Context().CommonProperties)

ctx, cancel := NewCtrlCContext()
Expand Down Expand Up @@ -144,7 +157,7 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
// return nil
// })

return &StressContext{
sc := &StressContext{
TestRunID: testRunID,
Nano: testRunID, // the same for now
ConnectionString: cs,
Expand All @@ -156,6 +169,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
Context: ctx,
cancel: cancel,
}

if options != nil && options.EmitStartEvent {
sc.Start(testName, nil)
}

return sc
}

func (sc *StressContext) Start(entityName string, attributes map[string]string) {
Expand All @@ -173,7 +192,7 @@ func (sc *StressContext) Start(entityName string, attributes map[string]string)
}

func (sc *StressContext) End() {
log.Printf("Stopping and flushing telemetry")
log.Printf("Stopping and flushing telemetry: %#v", sc.TC.Context().CommonProperties)

sc.cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
set -ex;
mkdir -p "$DEBUG_SHARE";
{{ if ne .Stress.benchmark true }}
/app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
/app/stress tests "{{ .Stress.testTarget }}" {{ default "" .Stress.args }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ else }}
/app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ end }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package tests

import (
"context"
"flag"
"fmt"
"log"
"strconv"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

// OpenCloseMeasurements tests that we are able to consistently open and close our links and connections
// in a timely way. This test doesn't immediately fail, it's primary purpose is just to provide historical
// and measurable data on our performance.
//
// The origin of this test was a bug we found in go-amqp where, if the frame was too small, it wouldn't parse
// and return it until we received more data (any data, just so long as it caused our total local buffer to exceed
// 8 bytes) [PR#320]
//
// PR#320: https://github.com/Azure/go-amqp/pull/320
func OpenCloseMeasurements(remainingArgs []string) {
type testArgs struct {
SleepDuration time.Duration
MessageCount int
BodySize int
}

fs := flag.NewFlagSet("args", flag.PanicOnError)
numRounds := fs.Int("rounds", 10, "The number of rounds of sends and closes to run")
_ = fs.Parse(remainingArgs)

fn := func(args testArgs) {
sc := shared.MustCreateStressContext("OpenCloseMeasurements", &shared.StressContextOptions{
CommonBaggage: map[string]string{
"SleepDuration": args.SleepDuration.String(),
"MessageCount": strconv.FormatInt(int64(args.MessageCount), 10),
"BodySize": strconv.FormatInt(int64(args.BodySize), 10),
},
EmitStartEvent: true,
})

defer sc.End()

queueName := fmt.Sprintf("OpenCloseMeasurements-%s", sc.Nano)
_ = shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{})

client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
sc.PanicOnError("failed to create client", err)

trackingSender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil)
sc.PanicOnError("failed to create sender", err)

log.Printf("Sending message to warm up connection and links.")

body := make([]byte, args.BodySize)

for i := 0; i < args.MessageCount; i++ {
err = trackingSender.SendMessage(context.Background(), &azservicebus.Message{
Body: body,
}, nil)
sc.NoErrorf(err, "failed to send message %d", i)
}

log.Printf("Sleeping for %s, done at %s...", args.SleepDuration, time.Now().Add(args.SleepDuration))
time.Sleep(args.SleepDuration)

log.Printf("Done sleeping, now attempting to close link")
// the error is reported for now to metrics - not going to kill this as we have a bug where
// the "detach because idle" error comes back from Close() right now.

start := time.Now()
max := 10 * time.Second
_ = trackingSender.Close(context.Background())

if time.Since(start) > max {
sc.PanicOnError("Slow close", fmt.Errorf("Took longer than %s", max))
}
}

// some simple cases
testCases := []testArgs{
{1 * time.Minute, 1, 10},
{5 * time.Minute, 100, 100},
{5 * time.Minute, 100, 10000},
{5 * time.Minute, 1, 10},
{10*time.Minute + 30*time.Second, 1, 10},
{11 * time.Minute, 1, 10},
}

for i := 0; i < *numRounds; i++ {
wg := sync.WaitGroup{}

for _, args := range testCases {
wg.Add(1)

go func(args testArgs) {
defer wg.Done()
fn(args)
}(args)
}

wg.Wait()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func Run(remainingArgs []string) {
"infiniteSendAndReceive": InfiniteSendAndReceiveRun,
"longRunningRenewLock": LongRunningRenewLockTest,
"mostlyIdleReceiver": MostlyIdleReceiver,
"openCloseMeasurements": OpenCloseMeasurements,
"rapidOpenClose": RapidOpenCloseTest,
"receiveCancellation": ReceiveCancellation,
"sendAndReceiveDrain": SendAndReceiveDrain,
Expand Down