Skip to content

Commit

Permalink
Merge pull request thingsboard#74 from mp-loki/master
Browse files Browse the repository at this point in the history
Kafka Spark integration tutorial updated
  • Loading branch information
ashvayka authored Oct 23, 2017
2 parents 72ec35f + adc6305 commit 11c69a3
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 71 deletions.
19 changes: 19 additions & 0 deletions docs/samples/analytics/resources/send-randomized-windspeed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'


client = mqtt.Client()

client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

for i in range(100):
x = random.randrange(20, 100)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
175 changes: 104 additions & 71 deletions docs/samples/analytics/spark-integration-with-thingsboard.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ThingsBoard is used to collect, store and visualize wind speed from these statio
Once again, this is a completely fake scenario just to demonstrate the integration of all components.

In this scenario we are going to upload wind speed as a telemetry reading, however, the geo-location zone will be a static attribute of the weather station device.
This is logical, since telemetry readings are going to change often, and the geo-location is static.
This makes sense, since telemetry readings are going to change often, and the geo-location is static.

We will analyze real-time data from multiple devices using [Spark Streaming](http://spark.apache.org/docs/latest/streaming-programming-guide.html) job with 10 seconds batch window.

Expand All @@ -41,7 +41,11 @@ So, in our case, the Spark Job itself acts as a gateway that publishes data on b

### Prerequisites

We assume that you have a ThingsBoard [instance](/docs/user-guide/install/installation-options/) up and running.
The following services must be up and running:

* ThingsBoard [instance](/docs/user-guide/install/installation-options/)
* Kafka. This guide assumes that you have it running on localhost on port 9092

We also assume that you are familiar with Kafka and Spark and have also prepared those environments for this tutorial.

### ThingsBoard configuration steps
Expand All @@ -52,9 +56,7 @@ We need to configure Kafka Plugin that will be used to push telemetry data to Ka
You can find the detailed description of Kafka Plugin [here](/docs/reference/plugins/kafka/).

[**Download**](/docs/samples/analytics/resources/kafka_plugin_for_spark_streaming_sample.json) the json with plugin descriptor
and use this [**instructions**](/docs/user-guide/ui/plugins/#plugin-import) to import it to your instance.

Please note that the plugin configuration expects Kafka to be running on the localhost on port 9092.
and use these [**instructions**](/docs/user-guide/ui/plugins/#plugin-import) to import it to your instance.

![image](/images/samples/analytics/spark/kafka-plugin-configuration.png)

Expand All @@ -65,7 +67,7 @@ Don't forget to **activate** your new plugin instance by clicking on the corresp
Now we need to configure the Rule that will be used to push wind speed data from the weather stations to Kafka.

[**Download**](/docs/samples/analytics/resources/windspeed_telemetry_rule.json) the json with plugin descriptor
and use this [**instructions**](/docs/user-guide/ui/rules/#rule-import) to import it to your instance.
and use these [**instructions**](/docs/user-guide/ui/rules/#rule-import) to import it to your instance.

Don't forget to **activate** your new rule instance by clicking on the corresponding button in plugin details!

Expand All @@ -84,8 +86,8 @@ See corresponding [**filter**](/docs/reference/filters/device-attributes-filter/

##### Timeseries data filter

Each device connected to ThingsBoard may upload multiple telemetry keys simultaneously on independently.
In some use cases, you may need to process only a certain sub-set of the data. We will use telemetry data filter to achieve this.
Each device connected to ThingsBoard may upload multiple telemetry keys simultaneously or independently.
In some use cases, you may need to process only a certain sub-set of data. We will use telemetry data filter to achieve this.

The filter expression below validates that **windSpeed** telemetry is present in the processed message.

Expand All @@ -99,25 +101,35 @@ Topic name in our case is **'weather-stations-data'** and the message is a valid

![image](/images/samples/analytics/spark/kafka-rule-action.png)

### Step 3. Configuration of the Analytics Gateway device
### Step 3. Wind Turbine Device Configuration

Let's create a device that we define 'Analytics Gateway' and we'll send average temperature from Spark Application to this device:
Let's create a device that we define as 'Wind Turbine 1' and we will send the telemetry data from this device via MQTT:

![image](/images/samples/analytics/spark/gateway-device.png)
![image](/images/samples/analytics/spark/wind-turbine-device.png)

Once added, open the 'Analytics Gateway' device card and click on copy 'Access Token' from this device and store it somewhere.
Once added, open the 'Wind Turbine 1' device card and click on 'Copy Access Token' from this device and store it somewhere.
We'll use this token later in Spark Application for sending analytics results back to ThingsBoard and will refer to it as **$GATEWAY_ACCESS_TOKEN**.

![image](/images/samples/analytics/spark/gateway-device-details.png)
![image](/images/samples/analytics/spark/wind-turbine-device-details.png)

We'll use this token later in Spark Application for sending analytics results back to ThingsBoard and will refer to it as **$GATEWAY_ACCESS_TOKEN**.
### Step 4: Asset Configuration

Now we have to create an Asset which will receive the aggregated data from Sparkk Application.
Add new asset on the Assets screen:

![image](/images/samples/analytics/spark/create-asset.png)

When asset is created, click on the asset card and copy the asset ID - we will need it in Spark Application:

![image](/images/samples/analytics/spark/copy-asset-id.png)

## Spark Application

### Step 4. Download the sample application source code
### Step 5. Download the sample application source code

Feel free to grab the [code from this sample ThingsBoard repository](https://github.com/thingsboard/samples/tree/master/spark-kafka-streaming-integration) and follow along.

### Step 5. Dependencies review
### Step 6. Dependencies review

The sample application was developed using Spark version **2.1.0**. Please consider this if you use a different version of Spark because in this case you may need to use a different version of Kafka Streaming API as well.

Expand All @@ -140,30 +152,30 @@ Dependencies that are used in the sample project:
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark-version}</version>
</dependency>
<!-- MQTT client dependency to send messages to ThingsBoard -->
<!-- HTTP Client to to send messages to ThingsBoard through REST API-->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho.client.version}</version>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
```

### Step 5. Source code review
### Step 7. Source code review

Here is a description of particular code snippet from **SparkKafkaStreamingDemoMain** class.
Here is a description of particular code snippet from **SparkKafkaAssetStreamingDemoMain** class.
Main constants are listed below:

```java
// Access token for 'Analytics Gateway' Device.
private static final String GATEWAY_ACCESS_TOKEN = "$GATEWAY_ACCESS_TOKEN";
// Kafka brokers URL for Spark Streaming to connect and fetched messages from.
private static final String KAFKA_BROKER_LIST = "localhost:9092";
// URL of ThingsBoard MQTT endpoint
private static final String THINGSBOARD_MQTT_ENDPOINT = "tcp://localhost:1883";
// Time interval in milliseconds of Spark Streaming Job, 10 seconds by default.
private static final int STREAM_WINDOW_MILLISECONDS = 10000; // 10 seconds
// Kafka telemetry topic to subscribe to. This should match to the topic in the rule action.
private static final Collection<String> TOPICS = Arrays.asList("weather-stations-data");
// URL of ThingsBoard REST endpoint
private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080";
// ThingsBoard User login
private static final String USERNAME = "[email protected]";
// ThingsBoard User password
private static final String PASSWORD = "tenant";
// Asset ID to post the aggregated data inot
private static final String ASSET_ID = "$ASSET_ID";
```

Main processing logic is listed below:
Expand All @@ -172,50 +184,53 @@ Main processing logic is listed below:

try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STREAM_WINDOW_MILLISECONDS))) {

connectToThingsboard();

JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, getKafkaParams())
);

stream.foreachRDD(rdd ->
{
// Map incoming JSON to WindSpeedData objects
JavaRDD<WindSpeedData> windRdd = rdd.map(new WeatherStationDataMapper());
// Map WindSpeedData objects by GeoZone
JavaPairRDD<String, AvgWindSpeedData> windByZoneRdd = windRdd.mapToPair(d -> new Tuple2<>(d.getGeoZone(), new AvgWindSpeedData(d.getWindSpeed())));
// Reduce all data volume by GeoZone key
windByZoneRdd = windByZoneRdd.reduceByKey((a, b) -> AvgWindSpeedData.sum(a, b));
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedData
List<WindSpeedData> aggData = windByZoneRdd.map(t -> new WindSpeedData(t._1, t._2.getAvgValue())).collect();
// Push aggregated data to ThingsBoard using Gateway MQTT API
publishTelemetryToThingsboard(aggData);
});

ssc.start();
ssc.awaitTermination();
loginRestTemplate();

JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(TOPICS, getKafkaParams())
);

stream.foreachRDD(rdd ->
{
// Map incoming JSON to WindSpeedData objects

JavaRDD<WindSpeedData> windRdd = rdd.map(new WeatherStationDataMapper());
// Map WindSpeedData objects by GeoZone
JavaPairRDD<String, AvgWindSpeedData> windByZoneRdd = windRdd.mapToPair(d -> new Tuple2<>(d.getGeoZone(), new AvgWindSpeedData(d.getWindSpeed())));
// Reduce all data volume by GeoZone key
windByZoneRdd = windByZoneRdd.reduceByKey((a, b) -> AvgWindSpeedData.sum(a, b));
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedData
List<WindSpeedData> aggData = windByZoneRdd.map(t -> new WindSpeedData(t._1, t._2.getAvgValue())).collect();
// Push aggregated data to ThingsBoard using Gateway MQTT API
publishTelemetryToThingsBoardAsset(aggData);
});

ssc.start();
ssc.awaitTermination();
}
```

Following method is responsbile for publishing the telemetry data:
The following method is responsbile for publishing the telemetry data:

```java
private void publishTelemetryToThingsboard(List<WindSpeedData> aggData) throws Exception {
if (!aggData.isEmpty()) {
for (WindSpeedData d : aggData) {
MqttMessage connectMsg = new MqttMessage(toConnectJson(d.getGeoZone()).getBytes(StandardCharsets.UTF_8));
client.publish("v1/gateway/connect", connectMsg, null, getCallback());
}
MqttMessage dataMsg = new MqttMessage(toDataJson(aggData).getBytes(StandardCharsets.UTF_8));
client.publish("v1/gateway/telemetry", dataMsg, null, getCallback());
}
private void publishTelemetryToThingsBoardAsset(List<WindSpeedData> aggData) throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.add("X-Authorization", "Bearer " + token);

if (!aggData.isEmpty()) {
for (WindSpeedData d : aggData) {
HttpEntity<?> httpEntity = new HttpEntity<Object>(d, requestHeaders);
ResponseEntity<Void> result = restTemplate.postForEntity(ASSET_PUBLISH_TELEMETRY_ENDPOINT,
httpEntity, Void.class);
}
}
}
```

Now let's run **SparkKafkaStreamingDemoMain** class from the IDE or submit it to Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average temperature telemetry to appropriate **'Average Temperature Device'** in *ThingsBoard*.
Now let's run **SparkKafkaAssetStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate **'Zone A Asset'** in *ThingsBoard*.

## Dry Run

Expand All @@ -227,12 +242,30 @@ The following command will provision **deviceType** and **geoZone** attributes.
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$YOUR_DEVICE_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}'
```

The following command will report **windSpeed** telemetry for a particular device.
The following [**send-randomized-windspeed.py**](/docs/samples/analytics/resources/send-randomized-windspeed.py) script will send 100 randomized windSpeed values to the device:

```bash
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/telemetry" -u "$YOUR_DEVICE_ACCESS_TOKEN" -m '{"windSpeed":42}'
``` python
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'


client = mqtt.Client()

client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

for i in range(100):
x = random.randrange(20, 100)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
```

Once you sent the telemetry data to ThingsBoard, wait up to 10 seconds for new device that identifies your **geoZone** to become available.
Once you have sent the telemetry data to ThingsBoard, wait a couple of seconds and then open up the telemetry tab on the asset:

![image](/images/samples/analytics/spark/zone-device-telemetry.png)
![image](/images/samples/analytics/spark/asset-telemetry.png)
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/samples/analytics/spark/copy-asset-id.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/samples/analytics/spark/create-asset.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file removed images/samples/analytics/spark/gateway-device.png
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified images/samples/analytics/spark/zone-device-telemetry.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 11c69a3

Please sign in to comment.