Skip to content

Commit

Permalink
[azeventhubs] Client identifier support (#20045)
Browse files Browse the repository at this point in the history
Event Hubs has an optional parameter you can pass that lets you identify a consumer link. This name will come back in error messages like when link ownership is contested. Given the distributed nature of consumers this identifier can help diagnose issues where consumers are battling over links.

Fixes #15074
  • Loading branch information
richardpark-msft authored Feb 28, 2023
1 parent 2877174 commit 80187b3
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 121 deletions.
8 changes: 7 additions & 1 deletion sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
# Release History

## 0.5.1 (Unreleased)
## 0.6.0 (Unreleased)

### Features Added

- Added the `ConsumerClientOptions.InstanceID` field. This optional field can enhance error messages from
Event Hubs. For example, error messages related to ownership changes for a partition will contain the
name of the link that has taken ownership, which can help with traceability.

### Breaking Changes

- `ConsumerClient.ID()` renamed to `ConsumerClient.InstanceID()`.

### Bugs Fixed

### Other Changes
Expand Down
53 changes: 39 additions & 14 deletions sdk/messaging/azeventhubs/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,40 @@ const (

// ConsumerClientOptions configures optional parameters for a ConsumerClient.
type ConsumerClientOptions struct {
// TLSConfig configures a client with a custom *tls.Config.
TLSConfig *tls.Config

// Application ID that will be passed to the namespace.
// ApplicationID is used as the identifier when setting the User-Agent property.
ApplicationID string

// InstanceID is a unique name used to identify the consumer. This can help with
// diagnostics as this name will be returned in error messages. By default,
// an identifier will be automatically generated.
InstanceID string

// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error)

// RetryOptions controls how often operations are retried from this client and any
// Receivers and Senders created from this client.
RetryOptions RetryOptions

// TLSConfig configures a client with a custom *tls.Config.
TLSConfig *tls.Config
}

// ConsumerClient can create PartitionClient instances, which can read events from
// a partition.
type ConsumerClient struct {
consumerGroup string
eventHub string
retryOptions RetryOptions
namespace *internal.Namespace
links *internal.Links[amqpwrap.AMQPReceiverCloser]

clientID string
// instanceID is a customer supplied instanceID that can be passed to Event Hubs.
// It'll be returned in error messages and can be useful for customers when
// troubleshooting.
instanceID string

links *internal.Links[amqpwrap.AMQPReceiverCloser]
namespace *internal.Namespace
retryOptions RetryOptions
}

// NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. You
Expand Down Expand Up @@ -141,6 +150,7 @@ func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *Partit
namespace: cc.namespace,
eventHub: cc.eventHub,
partitionID: partitionID,
instanceID: cc.instanceID,
consumerGroup: cc.consumerGroup,
retryOptions: cc.retryOptions,
}, options)
Expand Down Expand Up @@ -170,9 +180,9 @@ func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionI
return getPartitionProperties(ctx, cc.namespace, rpcLink.Link, cc.eventHub, partitionID, options)
}

// ID is the identifier for this ConsumerClient.
func (cc *ConsumerClient) ID() string {
return cc.clientID
// InstanceID is the identifier for this ConsumerClient.
func (cc *ConsumerClient) InstanceID() string {
return cc.instanceID
}

type consumerClientDetails struct {
Expand All @@ -187,7 +197,7 @@ func (cc *ConsumerClient) getDetails() consumerClientDetails {
FullyQualifiedNamespace: cc.namespace.FQDN,
ConsumerGroup: cc.consumerGroup,
EventHubName: cc.eventHub,
ClientID: cc.clientID,
ClientID: cc.InstanceID(),
}
}

Expand All @@ -212,7 +222,7 @@ func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions)
options = &ConsumerClientOptions{}
}

clientUUID, err := uuid.New()
instanceID, err := getInstanceID(options.InstanceID)

if err != nil {
return nil, err
Expand All @@ -221,7 +231,7 @@ func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions)
client := &ConsumerClient{
consumerGroup: args.consumerGroup,
eventHub: args.eventHub,
clientID: clientUUID.String(),
instanceID: instanceID,
}

var nsOptions []internal.NamespaceOption
Expand Down Expand Up @@ -263,3 +273,18 @@ func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions)

return client, nil
}

func getInstanceID(optionalID string) (string, error) {
if optionalID != "" {
return optionalID, nil
}

// generate a new one
id, err := uuid.New()

if err != nil {
return "", err
}

return id.String(), nil
}
118 changes: 96 additions & 22 deletions sdk/messaging/azeventhubs/consumer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package azeventhubs_test

import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub"
Expand Down Expand Up @@ -710,6 +712,66 @@ func TestConsumerClient_StartPosition_Latest(t *testing.T) {
}
}

func TestConsumerClient_InstanceID(t *testing.T) {
testParams := test.GetConnectionParamsForTest(t)

var instanceID string

// create a partition client with owner level 1 that's fully initialized.
{
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, nil)
require.NoError(t, err)
defer test.RequireClose(t, producerClient)

props := sendEventToPartition(t, producerClient, "0", []*azeventhubs.EventData{
{Body: []byte("hello")},
})

consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{
// We'll just let this one be auto-generated.
//InstanceID: "",
})
require.NoError(t, err)
defer test.RequireClose(t, consumerClient)

parsedUUID, err := uuid.Parse(consumerClient.InstanceID())
require.NotZero(t, parsedUUID)
require.NoError(t, err)

instanceID = consumerClient.InstanceID()

partitionClient, err := consumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
OwnerLevel: to.Ptr(int64(1)),
StartPosition: getStartPosition(props),
})
require.NoError(t, err)

// receive an event so we know the link is alive.
events, err := partitionClient.ReceiveEvents(context.Background(), 1, nil)
require.NotEmpty(t, events)
require.NoError(t, err)
}

failedConsumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, &azeventhubs.ConsumerClientOptions{
InstanceID: "LosesBecauseOfLowOwnerLevel",
RetryOptions: azeventhubs.RetryOptions{
MaxRetries: -1, // just fail immediately, don't retry.
},
})
require.NoError(t, err)
defer test.RequireClose(t, failedConsumerClient)

failedPartitionClient, err := failedConsumerClient.NewPartitionClient("0", &azeventhubs.PartitionClientOptions{
// the other partition client already has the partition open with owner level 1. So our attempt to connect will fail.
OwnerLevel: to.Ptr(int64(0)),
})
require.NoError(t, err)

_, err = failedPartitionClient.ReceiveEvents(context.Background(), 1, nil)

require.Contains(t, err.Error(), fmt.Sprintf("Description: Receiver '%s' with a higher epoch '1' already exists. Receiver 'LosesBecauseOfLowOwnerLevel' with epoch 0 cannot be created. Make sure you are creating receiver with increasing epoch value to ensure connectivity, or ensure all old epoch receivers are closed or disconnected", instanceID))
}

// mustSendEventsToAllPartitions sends the event given in evt to each partition in the
// eventHub, returning the sequence number just before the new message.
//
Expand Down Expand Up @@ -741,29 +803,8 @@ func mustSendEventsToAllPartitions(t *testing.T, events []*azeventhubs.EventData
go func(partitionID string) {
defer wg.Done()

partProps, err := producer.GetPartitionProperties(context.Background(), partitionID, nil)
require.NoError(t, err)
partProps := sendEventToPartition(t, producer, partitionID, events)
partitionsCh <- partProps

// send the message to the partition.
batch, err := producer.NewEventDataBatch(context.Background(), &azeventhubs.EventDataBatchOptions{
PartitionID: &partitionID,
})
require.NoError(t, err)

for _, event := range events {
if event.Properties == nil {
event.Properties = map[string]any{}
}

event.Properties["DestPartitionID"] = partitionID

err = batch.AddEventData(event, nil)
require.NoError(t, err)
}

err = producer.SendEventDataBatch(context.Background(), batch, nil)
require.NoError(t, err)
}(partitionID)
}

Expand Down Expand Up @@ -802,3 +843,36 @@ func getSortedBodies(events []*azeventhubs.ReceivedEventData) []string {

return bodies
}

func sendEventToPartition(t *testing.T, producer *azeventhubs.ProducerClient, partitionID string, events []*azeventhubs.EventData) azeventhubs.PartitionProperties {
partProps, err := producer.GetPartitionProperties(context.Background(), partitionID, nil)
require.NoError(t, err)

// send the message to the partition.
batch, err := producer.NewEventDataBatch(context.Background(), &azeventhubs.EventDataBatchOptions{
PartitionID: &partitionID,
})
require.NoError(t, err)

for _, event := range events {
eventToSend := *event

props := map[string]any{
"DestPartitionID": partitionID,
}

for k, v := range event.Properties {
props[k] = v
}

eventToSend.Properties = props

err = batch.AddEventData(event, nil)
require.NoError(t, err)
}

err = producer.SendEventDataBatch(context.Background(), batch, nil)
require.NoError(t, err)

return partProps
}
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/internal/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
package internal

// Version is the semantic version number
const Version = "v0.5.1"
const Version = "v0.6.0"
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (inf *processorStressTest) Run(ctx context.Context) error {
return err
}

shortConsumerID := string(cc.ID()[0:5])
shortConsumerID := string(cc.InstanceID()[0:5])

go func() {
for {
Expand Down
Loading

0 comments on commit 80187b3

Please sign in to comment.