Skip to content

Commit

Permalink
Spark Tutorial updated
Browse files Browse the repository at this point in the history
  • Loading branch information
mp-loki committed Oct 30, 2017
1 parent 11c69a3 commit 7d72cf0
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@
broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'


client = mqtt.Client()

client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN")
client.username_pw_set("$WIND_TURBINE_1_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

for i in range(100):
x = random.randrange(20, 100)
while True:
x = random.randrange(30, 36)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
Expand Down
18 changes: 18 additions & 0 deletions docs/samples/analytics/resources/send-randomized-windspeed-2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'

client = mqtt.Client()

client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

while True:
x = random.randrange(45, 51)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
18 changes: 18 additions & 0 deletions docs/samples/analytics/resources/send-randomized-windspeed-3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'

client = mqtt.Client()

client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

while True:
x = random.randrange(30, 61)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
210 changes: 161 additions & 49 deletions docs/samples/analytics/spark-integration-with-thingsboard.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ and use these [**instructions**](/docs/user-guide/ui/plugins/#plugin-import) to

Don't forget to **activate** your new plugin instance by clicking on the corresponding button in plugin details!

![image](/images/samples/analytics/spark/kafka-plugin-activation.png)

### Step 2. Configuration of Telemetry Forwarding Rule

Now we need to configure the Rule that will be used to push wind speed data from the weather stations to Kafka.
Expand All @@ -71,6 +73,8 @@ and use these [**instructions**](/docs/user-guide/ui/rules/#rule-import) to impo

Don't forget to **activate** your new rule instance by clicking on the corresponding button in plugin details!

![image](/images/samples/analytics/spark/kafka-rule-activation.png)

Let's review the main rule configuration below.

##### Attributes filter
Expand Down Expand Up @@ -103,26 +107,23 @@ Topic name in our case is **'weather-stations-data'** and the message is a valid

### Step 3. Wind Turbine Device Configuration

Let's create a device that we define as 'Wind Turbine 1' and we will send the telemetry data from this device via MQTT:
Now let us create three devices that we define as **Wind Turbine 1**, **Wind Turbine 2** and **Wind Turbine 3**. We will send the telemetry data from these devices via MQTT later.
Please, make sure to set the device type to 'WeatherStation' for all three devices:

![image](/images/samples/analytics/spark/wind-turbine-device.png)
![image](/images/samples/analytics/spark/create-devices.png)

Once added, open the 'Wind Turbine 1' device card and click on 'Copy Access Token' from this device and store it somewhere.
We'll use this token later in Spark Application for sending analytics results back to ThingsBoard and will refer to it as **$GATEWAY_ACCESS_TOKEN**.
Once added, for each device open the device card, click on 'Copy Access Token' button and store the token somewhere.
We'll use these tokens later in Python scripts sending MQTT data to ThingsBoard.

![image](/images/samples/analytics/spark/wind-turbine-device-details.png)

### Step 4: Asset Configuration
### Step 4: Create an Asset

Now we have to create an Asset which will receive the aggregated data from Sparkk Application.
Add new asset on the Assets screen:
Add new Asset on the Assets screen. Please, make sure to set the Asset type to WeatherStation:

![image](/images/samples/analytics/spark/create-asset.png)

When asset is created, click on the asset card and copy the asset ID - we will need it in Spark Application:

![image](/images/samples/analytics/spark/copy-asset-id.png)

## Spark Application

### Step 5. Download the sample application source code
Expand Down Expand Up @@ -162,20 +163,19 @@ Dependencies that are used in the sample project:

### Step 7. Source code review

Here is a description of particular code snippet from **SparkKafkaAssetStreamingDemoMain** class.
The Spark Application logic is concentrated mainly in two classes:
- **SparkKafkaStreamingDemoMain** is listening to the Kafka topic and calculates the averages
- **RestClient** finds the ThingsBoard Asset by name and pushes the aggregated data to it.
Below is a description of particular code snippet from **SparkKafkaStreamingDemoMain** class.
Main constants are listed below:

```java
// Kafka brokers URL for Spark Streaming to connect and fetched messages from.
private static final String KAFKA_BROKER_LIST = "localhost:9092";
// URL of ThingsBoard REST endpoint
private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080";
// ThingsBoard User login
private static final String USERNAME = "[email protected]";
// ThingsBoard User password
private static final String PASSWORD = "tenant";
// Asset ID to post the aggregated data inot
private static final String ASSET_ID = "$ASSET_ID";
// Time interval in milliseconds of Spark Streaming Job, 10 seconds by default.
private static final int STREAM_WINDOW_MILLISECONDS = 10000; // 10 seconds
// Kafka telemetry topic to subscribe to. This should match to the topic in the rule action.
private static final Collection<String> TOPICS = Arrays.asList("weather-stations-data");
```

Main processing logic is listed below:
Expand All @@ -184,8 +184,6 @@ Main processing logic is listed below:

try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STREAM_WINDOW_MILLISECONDS))) {

loginRestTemplate();

JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
Expand All @@ -195,54 +193,109 @@ try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STRE

stream.foreachRDD(rdd ->
{
// Map incoming JSON to WindSpeedData objects

JavaRDD<WindSpeedData> windRdd = rdd.map(new WeatherStationDataMapper());
// Map WindSpeedData objects by GeoZone
// Map incoming JSON to WindSpeedAndGeoZoneData objects
JavaRDD<WindSpeedAndGeoZoneData> windRdd = rdd.map(new WeatherStationDataMapper());
// Map WindSpeedAndGeoZoneData objects by GeoZone
JavaPairRDD<String, AvgWindSpeedData> windByZoneRdd = windRdd.mapToPair(d -> new Tuple2<>(d.getGeoZone(), new AvgWindSpeedData(d.getWindSpeed())));
// Reduce all data volume by GeoZone key
windByZoneRdd = windByZoneRdd.reduceByKey((a, b) -> AvgWindSpeedData.sum(a, b));
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedData
List<WindSpeedData> aggData = windByZoneRdd.map(t -> new WindSpeedData(t._1, t._2.getAvgValue())).collect();
// Push aggregated data to ThingsBoard using Gateway MQTT API
publishTelemetryToThingsBoardAsset(aggData);
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedAndGeoZoneData
List<WindSpeedAndGeoZoneData> aggData = windByZoneRdd.map(t -> new WindSpeedAndGeoZoneData(t._1, t._2.getAvgValue())).collect();
// Push aggregated data to ThingsBoard Asset
restClient.sendTelemetryToAsset(aggData);
});

ssc.start();
ssc.awaitTermination();
}
```

The following method is responsbile for publishing the telemetry data:
The next section describes the **RestClient** class.
Main constants are listed below:

```java
// ThingsBoard server URL
private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080";
// ThingsBoard Create Asset endpoint
private static final String CREATE_ASSET_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/asset";
// ThingsBoard Get WeatherStation Assets endpoint
private static final String GET_ALL_TENANT_ASSETS_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/tenant/assets?limit=100&type=WeatherStation";
// ThingsBoard Publish Asset telemetry endpoint template
private static final String PUBLISH_ASSET_TELEMETRY_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/plugins/telemetry/ASSET/${ASSET_ID}/timeseries/values";
// ThingsBoard User login
private static final String USERNAME = "[email protected]";
// ThingsBoard User password
private static final String PASSWORD = "tenant";
```

The **getOrCreateAsset** method tries to get the Asset by name from **assetMap**. If no Asset with such name is found, it calls **createAsset** method which propagates a new Asset to ThingsBoard:

```java
private void publishTelemetryToThingsBoardAsset(List<WindSpeedData> aggData) throws Exception {
HttpHeaders requestHeaders = new HttpHeaders();
requestHeaders.add("X-Authorization", "Bearer " + token);

if (!aggData.isEmpty()) {
for (WindSpeedData d : aggData) {
HttpEntity<?> httpEntity = new HttpEntity<Object>(d, requestHeaders);
ResponseEntity<Void> result = restTemplate.postForEntity(ASSET_PUBLISH_TELEMETRY_ENDPOINT,
httpEntity, Void.class);
private Asset getOrCreateAsset(String assetName) {
Asset asset = assetMap.get(assetName);
if (asset == null) {
asset = createAsset(assetName);
assetMap.put(assetName, asset);
}
return asset;
}

private Asset createAsset(String assetName) {
Asset asset = new Asset();
asset.setName(assetName);
asset.setType(WEATHER_STATION);
HttpHeaders requestHeaders = getHttpHeaders();
HttpEntity<Asset> httpEntity = new HttpEntity<>(asset, requestHeaders);
ResponseEntity<Asset> responseEntity = restTemplate.postForEntity(CREATE_ASSET_ENDPOINT, httpEntity, Asset.class);
return responseEntity.getBody();
}
```

The following method iterates over the aggregated data list, strips off the **geoZone** attribute and sends the aggregated **windSpeed** to the Asset:

```java
public void sendTelemetryToAsset(List<WindSpeedAndGeoZoneData> aggData) {
if (aggData.isEmpty()) {
return;
}
for (WindSpeedAndGeoZoneData windSpeedGeoZoneata : aggData) {
String assetName = windSpeedGeoZoneata.getGeoZone();
if (StringUtils.isEmpty(assetName)) {
return;
}
Asset asset = getOrCreateAsset(assetName);
HttpHeaders requestHeaders = getHttpHeaders();
HttpEntity<?> httpEntity = new HttpEntity<Object>(new WindSpeedData(windSpeedGeoZoneata.getWindSpeed()), requestHeaders);
String assetPublishEndpoint = getAssetPublishEndpoint(asset.getId().getId());
restTemplate.postForEntity(assetPublishEndpoint,
httpEntity, Void.class);

}
}
```

Now let's run **SparkKafkaAssetStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate **'Zone A Asset'** in *ThingsBoard*.
Now let's run **SparkKafkaStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate Asset in *ThingsBoard*.

## Dry Run

Once *Kafka Plugin* is configured, *'Analytics Gateway device'* is provisioned and *Spark Streaming Application* is running please start sending **windSpeed** telemetry from different devices.

The following command will provision **deviceType** and **geoZone** attributes. You may change zone to different values for different devices.
The following commands will provision **deviceType** and **geoZone** attributes for the devices.
Let us provision the **"geozone":"Zone A"** for **Wind Turbine 1** and **Wind Turbine 2**:

```bash
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_1_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}'
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_2_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}'
```

For **Wind Turbine 3** let us set **"geozone" to "Zone B"**:

```bash
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$YOUR_DEVICE_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}'
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_3_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone B"}'
```

The following [**send-randomized-windspeed.py**](/docs/samples/analytics/resources/send-randomized-windspeed.py) script will send 100 randomized windSpeed values to the device:
Now let us send the telemetry for the **Wind Turbine 1**.
The following [**send-randomized-windspeed-1.py**](/docs/samples/analytics/resources/send-randomized-windspeed-1.py) script will send randomized windSpeed values in the range between 30 and 35 inclusively. Just replace the **$WIND_TURBINE_1_ACCESS_TOKEN** with the actual value or set the environment variable:

``` python
import paho.mqtt.client as mqtt
Expand All @@ -252,20 +305,79 @@ import random
broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'


client = mqtt.Client()

client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN")
client.username_pw_set("$WIND_TURBINE_1_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

for i in range(100):
x = random.randrange(20, 100)
while True:
x = random.randrange(30, 36)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
```

Once you have sent the telemetry data to ThingsBoard, wait a couple of seconds and then open up the telemetry tab on the asset:
The **Zone A** Asset will start recieving the aggregated windSpeed telemetry:

![image](/images/samples/analytics/spark/asset-telemetry.png)
![image](/images/samples/analytics/spark/zone-a-telemetry-1.png)

Let us keep this script running. Now, in a separate terminal window let's run [**send-randomized-windspeed-2.py**](/docs/samples/analytics/resources/send-randomized-windspeed-2.py) script which will start publishing telemetry data for **Wind Turbine 2**. The windSpeed for **Wind Turbine 2** will be fluctuating between 45 and 50:

```python
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'

client = mqtt.Client()

client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

while True:
x = random.randrange(45, 51)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
```

As soon as both **Wind Turbine 1** and **Wind Turbine 2** have the **geoZone** attribute set to **Zone A**, the Spark application will average the windSpeed values from both devices and push the aggregate to **Zone A** Asset. Now **Zone A** Asset will receive the aggregate windSpeed something closer to the value of 40:

![image](/images/samples/analytics/spark/zone-a-telemetry-2.png)

Now let us push telemetry for **Wind Turbine 3**. As you remember, it's **geoZone** is **Zone B**, and currently the asset with such name does not exist. What will happen is when the Spark application will receive the telemetry from **Wind Turbine 3**, it will detect that the target asset is missing and will propagate one to ThingsBoard.

The [**send-randomized-windspeed-3.py**](/docs/samples/analytics/resources/send-randomized-windspeed-3.py) will post telemetry for **Wind Turbine 3** with windSpeed fluctuating between 30 and 60:

```python
import paho.mqtt.client as mqtt
from time import sleep
import random

broker="test.mosquitto.org"
topic_pub='v1/devices/me/telemetry'

client = mqtt.Client()

client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN")
client.connect('127.0.0.1', 1883, 1)

while True:
x = random.randrange(30, 61)
print x
msg = '{"windSpeed":"'+ str(x) + '"}'
client.publish(topic_pub, msg)
sleep(0.1)
```

Now you should see the new **Zone B** Asset propagated on Assets screen:

![image](/images/samples/analytics/spark/zone-b-propagated.png)

Open the **Zone B** Asset card to check out the telemetry:

![image](/images/samples/analytics/spark/zone-b-telemetry.png)
Binary file removed images/samples/analytics/spark/asset-telemetry.png
Binary file not shown.
Binary file removed images/samples/analytics/spark/copy-asset-id.png
Binary file not shown.
Binary file added images/samples/analytics/spark/create-devices.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.

0 comments on commit 7d72cf0

Please sign in to comment.