Skip to content
This repository has been archived by the owner on Nov 16, 2023. It is now read-only.

Commit

Permalink
feat(iot-device): Add sample to demonstrate receive c2d callback API
Browse files Browse the repository at this point in the history
  • Loading branch information
abhipsaMisra authored Nov 5, 2020
1 parent 1619587 commit 2cb30ee
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ public class DeviceReconnectionSample
private readonly TransportType _transportType;

private readonly ILogger _logger;
private static DeviceClient s_deviceClient;

private static ConnectionStatus s_connectionStatus;
private static bool s_wasEverConnected;
// Mark these fields as volatile so that their latest values are referenced.
private static volatile DeviceClient s_deviceClient;
private static volatile ConnectionStatus s_connectionStatus = ConnectionStatus.Disconnected;

public DeviceReconnectionSample(List<string> deviceConnectionStrings, TransportType transportType, ILogger logger)
{
Expand All @@ -53,9 +53,10 @@ public DeviceReconnectionSample(List<string> deviceConnectionStrings, TransportT
InitializeClient();
}

private bool IsDeviceConnected => s_connectionStatus == ConnectionStatus.Connected;

public async Task RunSampleAsync()
{
Console.WriteLine("Press Control+C to quit the sample.");
using var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, eventArgs) =>
{
Expand All @@ -64,6 +65,8 @@ public async Task RunSampleAsync()
_logger.LogInformation("Sample execution cancellation requested; will exit.");
};

_logger.LogInformation($"Sample execution started, press Control+C to quit the sample.");

try
{
await Task.WhenAll(SendMessagesAsync(cts.Token), ReceiveMessagesAsync(cts.Token));
Expand All @@ -76,30 +79,39 @@ public async Task RunSampleAsync()

private void InitializeClient()
{
// If the client reports Connected status, it is already in operational state.
if (s_connectionStatus != ConnectionStatus.Connected
&& _deviceConnectionStrings.Any())
if (ShouldClientBeInitialized(s_connectionStatus))
{
// Allow a single thread to dispose and initialize the client instance.
lock (_initLock)
{
_logger.LogDebug($"Attempting to initialize the client instance, current status={s_connectionStatus}");

// If the device client instance has been previously initialized, then dispose it.
// The s_wasEverConnected variable is required to store if the client ever reported Connected status.
if (s_wasEverConnected && s_connectionStatus == ConnectionStatus.Disconnected)
if (ShouldClientBeInitialized(s_connectionStatus))
{
s_deviceClient?.Dispose();
s_wasEverConnected = false;
_logger.LogDebug($"Attempting to initialize the client instance, current status={s_connectionStatus}");

// If the device client instance has been previously initialized, then dispose it.
if (s_deviceClient != null)
{
s_deviceClient.Dispose();
s_deviceClient = null;
}
}

s_deviceClient = DeviceClient.CreateFromConnectionString(_deviceConnectionStrings.First(), _transportType);
var options = new ClientOptions
{
SdkAssignsMessageId = Shared.SdkAssignsMessageId.WhenUnset,
};

s_deviceClient = DeviceClient.CreateFromConnectionString(_deviceConnectionStrings.First(), _transportType, options);
s_deviceClient.SetConnectionStatusChangesHandler(ConnectionStatusChangeHandler);
s_deviceClient.OperationTimeoutInMilliseconds = (uint)s_operationTimeout.TotalMilliseconds;

_logger.LogDebug($"Initialized the client instance.");
}

try
{
// Force connection now
// Force connection now.
// OpenAsync() is an idempotent call, it has the same effect if called once or multiple times on the same client.
s_deviceClient.OpenAsync().GetAwaiter().GetResult();
_logger.LogDebug($"Initialized the client instance.");
}
Expand All @@ -119,8 +131,6 @@ private void ConnectionStatusChangeHandler(ConnectionStatus status, ConnectionSt
{
case ConnectionStatus.Connected:
_logger.LogDebug("### The DeviceClient is CONNECTED; all operations will be carried out as normal.");

s_wasEverConnected = true;
break;

case ConnectionStatus.Disconnected_Retrying:
Expand Down Expand Up @@ -190,7 +200,7 @@ private async Task SendMessagesAsync(CancellationToken cancellationToken)

while (!cancellationToken.IsCancellationRequested)
{
if (s_connectionStatus == ConnectionStatus.Connected)
if (IsDeviceConnected)
{
_logger.LogInformation($"Device sending message {++messageCount} to IoT Hub...");

Expand Down Expand Up @@ -231,13 +241,13 @@ private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
if (s_connectionStatus != ConnectionStatus.Connected)
if (!IsDeviceConnected)
{
await Task.Delay(s_sleepDuration);
continue;
}

_logger.LogInformation($"Device waiting for C2D messages from the hub - for {s_sleepDuration}...");
_logger.LogInformation($"Device waiting for C2D messages from the hub for {s_sleepDuration}...");
_logger.LogInformation("Use the IoT Hub Azure Portal or Azure IoT Explorer to send a message to this device.");

try
Expand Down Expand Up @@ -298,5 +308,15 @@ private async Task ReceiveMessageAndCompleteAsync()
await s_deviceClient.CompleteAsync(receivedMessage);
_logger.LogInformation($"Completed message [{messageData}].");
}

// If the client reports Connected status, it is already in operational state.
// If the client reports Disconnected_retrying status, it is trying to recover its connection.
// If the client reports Disconnected status, you will need to dispose and recreate the client.
// If the client reports Disabled status, you will need to dispose and recreate the client.
private bool ShouldClientBeInitialized(ConnectionStatus connectionStatus)
{
return (connectionStatus == ConnectionStatus.Disconnected || connectionStatus == ConnectionStatus.Disabled)
&& _deviceConnectionStrings.Any();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.8.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.31.0" />
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.33.0" />
</ItemGroup>

<ItemGroup>
Expand Down
107 changes: 107 additions & 0 deletions iot-hub/Samples/device/DeviceReconnectionSample/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,113 @@

This sample code demonstrates the various connection status changes and connection status change reasons the device client can return, and how to handle them.

### Initialize the client:

```csharp
// Connection string:
// Get the device connection string from Azure IoT Portal, or using Azure CLI.
// Azure portal -
// Navigate to your IoT Hub. From the left pane, under "Explorers", click on "IoT devices".
// Click and navigate to your device.
// Copy the connection strings listed (primary and/or secondary).
// Azure CLI -
// az iot hub device-identity connection-string show --device-id <device_id> [--key-type {primary, secondary}]
// --key-type is optional. It defaults to "primary".
//
// Transport type:
// The transport to use to communicate with the IoT Hub. Possible values include Mqtt,
// Mqtt_WebSocket_Only, Mqtt_Tcp_Only, Amqp, Amqp_WebSocket_Only, Amqp_Tcp_only, and Http1.
string connectionString = "<connection_string>";
TransportType transportType = TransportType.Mqtt;

// This option is helpful in delegating the assignment of Message.MessageId to the sdk.
// If the user doesn't set a value for Message.MessageId, the sdk will assign it a random GUID before sending the message.
var options = new ClientOptions
{
SdkAssignsMessageId = Shared.SdkAssignsMessageId.WhenUnset,
};
deviceClient = DeviceClient.CreateFromConnectionString(connectionString, transportType, options);
```

### Send device to cloud telemetry:

```csharp
// This snippet shows you how to call the API for sending telemetry from your device client.
// In order to ensure that your client is resilient to disconnection events and exceptions, refer to https://github.com/Azure-Samples/azure-iot-samples-csharp/blob/master/iot-hub/Samples/device/DeviceReconnectionSample/DeviceReconnectionSample.cs.
var temperature = 25;
var humidity = 70;
string messagePayload = $"{{\"temperature\":{temperature},\"humidity\":{humidity}}}";

using var eventMessage = new Message(Encoding.UTF8.GetBytes(messagePayload))
{
ContentEncoding = Encoding.UTF8.ToString(),
ContentType = "application/json",
};

await deviceClient.SendEventAsync(message);
```

### Receive cloud to device telemetry (using the polling API) and complete the message:

```csharp
// This snippet shows you how to call the API for receiving telemetry sent to your device client.
// In order to ensure that your client is resilient to disconnection events and exceptions, refer to https://github.com/Azure-Samples/azure-iot-samples-csharp/blob/master/iot-hub/Samples/device/DeviceReconnectionSample/DeviceReconnectionSample.cs.
using Message receivedMessage = await deviceClient.ReceiveAsync();
if (receivedMessage == null)
{
Console.WriteLine("No message received; timed out.");
return;
}

string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
var formattedMessage = new StringBuilder($"Received message: [{messageData}]\n");

// User set application properties can be retrieved from the Message.Properties dictionary.
foreach (KeyValuePair<string, string> prop in receivedMessage.Properties)
{
formattedMessage.AppendLine($"\tProperty: key={prop.Key}, value={prop.Value}");
}

// System properties can be accessed using their respective accessors.
formattedMessage.AppendLine($"\tMessageId: {receivedMessage.MessageId}");

Console.WriteLine(formattedMessage.ToString());
await deviceClient.CompleteAsync(receivedMessage);
```

### Receive cloud to device telemetry (using the callback) and complete the message:

```csharp
// This snippet shows you how to call the API for receiving telemetry sent to your device client.
// In order to ensure that your client is resilient to disconnection events and exceptions,
// refer to https://github.com/Azure-Samples/azure-iot-samples-csharp/blob/master/iot-hub/Samples/device/DeviceReconnectionSample/DeviceReconnectionSample.cs.
private async Task OnC2dMessageReceived(Message receivedMessage, object userContext)
{
string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
var formattedMessage = new StringBuilder($"Received message: [{messageData}]\n");

// User set application properties can be retrieved from the Message.Properties dictionary.
foreach (KeyValuePair<string, string> prop in receivedMessage.Properties)
{
formattedMessage.AppendLine($"\tProperty: key={prop.Key}, value={prop.Value}");
}

// System properties can be accessed using their respective accessors.
formattedMessage.AppendLine($"\tMessageId: {receivedMessage.MessageId}");

Console.WriteLine(formattedMessage.ToString());
await deviceClient.CompleteAsync(receivedMessage);
}

// Subscribe to the receive message API.
await deviceClient.SetReceiveMessageHandlerAsync(OnC2dMessageReceived, deviceClient);

// Once you are done receiving telemetry messages sent to your device client,
// you can unsubscribe from the receive callback by setting a null handler.
await deviceClient.SetReceiveMessageHandlerAsync(null, deviceClient);
```

Some examples on how to simulate client reconnection:
- Unplugging the network cable - this will cause a transient network exception to be thrown, which will be retried internally by the SDK.
- Roll over your client instance's shared access key or initialize your client instance with a shared access signature based connection string (with a fixed expiration time for the token) - this will cause the client to return a status of `Disconnected` with a status change reason of `Bad_Credential`. If you perform an operation when the client is in this state, your application will receive an `UnauthorizedException`, which is marked as non-transient. The SDK will not retry in this case.
Expand Down
2 changes: 1 addition & 1 deletion iot-hub/Samples/device/IoTHubDeviceSamples.sln
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ VisualStudioVersion = 16.0.30320.27
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FileUploadSample", "FileUploadSample\FileUploadSample.csproj", "{B0E80EC3-9655-44E6-8671-34404879E557}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageSample", "MessageSample\MessageSample.csproj", "{5EF666EB-4C46-47CF-9175-C403DDFB24DD}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessageReceiveSample", "MessageReceiveSample\MessageReceiveSample.csproj", "{5EF666EB-4C46-47CF-9175-C403DDFB24DD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MethodSample", "MethodSample\MethodSample.csproj", "{2834C055-7427-4742-BE88-0EC7E0B8FB1D}"
EndProject
Expand Down
108 changes: 108 additions & 0 deletions iot-hub/Samples/device/MessageReceiveSample/MessageReceiveSample.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;

namespace Microsoft.Azure.Devices.Client.Samples
{
/// <summary>
/// This sample demonstrates the two options available for receiving messages sent to a device client instance.
/// You can receive messages either by calling the polling ReceiveAsync() API, or by setting callback to receive messages using SetReceiveMessageHandlerAsync().
/// If you set a callback for receiving messages, any subsequent calls to the polling ReceiveAsync() API will return null.
/// Setting a callback for receiving messages removes the need for you to continuously poll for received messages.
/// It is worth noting that the callback is available only over Mqtt, Mqtt_WebSocket_Only, Mqtt_Tcp_Only, Amqp, Amqp_WebSocket_Only, Amqp_Tcp_only.
/// Http1 does not support setting callbacks since internally we would need to poll for received messages anyway.
/// </summary>
public class MessageReceiveSample
{
private static readonly TimeSpan s_sleepDuration = TimeSpan.FromSeconds(5);
private static readonly TimeSpan s_receiveTimeout = TimeSpan.FromSeconds(10);
private readonly DeviceClient _deviceClient;

public MessageReceiveSample(DeviceClient deviceClient)
{
_deviceClient = deviceClient ?? throw new ArgumentNullException(nameof(deviceClient));
}

public async Task RunSampleAsync()
{
// First receive C2D messages using the polling ReceiveAsync().
Console.WriteLine($"\n{DateTime.Now}> Device waiting for C2D messages from the hub for {s_receiveTimeout}...");
Console.WriteLine($"{DateTime.Now}> Use the Azure Portal IoT Hub blade or Azure IoT Explorer to send a message to this device.");
await ReceiveC2dMessagesPollingAndComplete(s_receiveTimeout);

// Now subscribe to receive C2D messages through a callback.
await _deviceClient.SetReceiveMessageHandlerAsync(OnC2dMessageReceived, _deviceClient);
Console.WriteLine($"\n{DateTime.Now}> Subscribed to receive C2D messages over callback.");

// Now wait to receive C2D messages through the callback.
// Since you are subscribed to receive messages through the callback, any call to the polling ReceiveAsync() API will now return "null".
Console.WriteLine($"\n{DateTime.Now}> Device waiting for C2D messages from the hub for {s_receiveTimeout}...");
Console.WriteLine($"{DateTime.Now}> Use the Azure Portal IoT Hub blade or Azure IoT Explorer to send a message to this device.");
await Task.Delay(s_receiveTimeout);

// Now unsubscibe from receiving the callback.
await _deviceClient.SetReceiveMessageHandlerAsync(null, _deviceClient);
}

private async Task ReceiveC2dMessagesPollingAndComplete(TimeSpan timeout)
{
var sw = new Stopwatch();
sw.Start();

Console.WriteLine($"{DateTime.Now}> Receiving C2D messages on the polling ReceiveAsync().");
while (sw.Elapsed < timeout)
{
using Message receivedMessage = await _deviceClient.ReceiveAsync(timeout);

if (receivedMessage == null)
{
Console.WriteLine($"{DateTime.Now}> Polling ReceiveAsync() - no message received.");
await Task.Delay(s_sleepDuration);
continue;
}

Console.WriteLine($"{DateTime.Now}> Polling ReceiveAsync() - received message with Id={receivedMessage.MessageId}");
ProcessReceivedMessage(receivedMessage);

await _deviceClient.CompleteAsync(receivedMessage);
Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");

receivedMessage.Dispose();
}

sw.Stop();
}

private async Task OnC2dMessageReceived(Message receivedMessage, object _)
{
Console.WriteLine($"{DateTime.Now}> C2D message callback - message received with Id={receivedMessage.MessageId}.");
ProcessReceivedMessage(receivedMessage);

await _deviceClient.CompleteAsync(receivedMessage);
Console.WriteLine($"{DateTime.Now}> Completed C2D message with Id={receivedMessage.MessageId}.");

receivedMessage.Dispose();
}

private void ProcessReceivedMessage(Message receivedMessage)
{
string messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
var formattedMessage = new StringBuilder($"Received message: [{messageData}]\n");

// User set application properties can be retrieved from the Message.Properties dictionary.
foreach (KeyValuePair<string, string> prop in receivedMessage.Properties)
{
formattedMessage.AppendLine($"\tProperty: key={prop.Key}, value={prop.Value}");
}
// System properties can be accessed using their respective accessors.
formattedMessage.AppendLine($"\tMessageId: {receivedMessage.MessageId}");

Console.WriteLine($"{DateTime.Now}> {formattedMessage}");
}
}
}
Loading

0 comments on commit 2cb30ee

Please sign in to comment.