forked from thingsboard/thingsboard.github.io
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request thingsboard#75 from mp-loki/master
Spark Tutorial updated
- Loading branch information
Showing
14 changed files
with
200 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
18 changes: 18 additions & 0 deletions
18
docs/samples/analytics/resources/send-randomized-windspeed-2.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import paho.mqtt.client as mqtt | ||
from time import sleep | ||
import random | ||
|
||
broker="test.mosquitto.org" | ||
topic_pub='v1/devices/me/telemetry' | ||
|
||
client = mqtt.Client() | ||
|
||
client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN") | ||
client.connect('127.0.0.1', 1883, 1) | ||
|
||
while True: | ||
x = random.randrange(45, 51) | ||
print x | ||
msg = '{"windSpeed":"'+ str(x) + '"}' | ||
client.publish(topic_pub, msg) | ||
sleep(0.1) |
18 changes: 18 additions & 0 deletions
18
docs/samples/analytics/resources/send-randomized-windspeed-3.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
import paho.mqtt.client as mqtt | ||
from time import sleep | ||
import random | ||
|
||
broker="test.mosquitto.org" | ||
topic_pub='v1/devices/me/telemetry' | ||
|
||
client = mqtt.Client() | ||
|
||
client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN") | ||
client.connect('127.0.0.1', 1883, 1) | ||
|
||
while True: | ||
x = random.randrange(30, 61) | ||
print x | ||
msg = '{"windSpeed":"'+ str(x) + '"}' | ||
client.publish(topic_pub, msg) | ||
sleep(0.1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,6 +62,8 @@ and use these [**instructions**](/docs/user-guide/ui/plugins/#plugin-import) to | |
|
||
Don't forget to **activate** your new plugin instance by clicking on the corresponding button in plugin details! | ||
|
||
![image](/images/samples/analytics/spark/kafka-plugin-activation.png) | ||
|
||
### Step 2. Configuration of Telemetry Forwarding Rule | ||
|
||
Now we need to configure the Rule that will be used to push wind speed data from the weather stations to Kafka. | ||
|
@@ -71,6 +73,8 @@ and use these [**instructions**](/docs/user-guide/ui/rules/#rule-import) to impo | |
|
||
Don't forget to **activate** your new rule instance by clicking on the corresponding button in plugin details! | ||
|
||
![image](/images/samples/analytics/spark/kafka-rule-activation.png) | ||
|
||
Let's review the main rule configuration below. | ||
|
||
##### Attributes filter | ||
|
@@ -103,26 +107,23 @@ Topic name in our case is **'weather-stations-data'** and the message is a valid | |
|
||
### Step 3. Wind Turbine Device Configuration | ||
|
||
Let's create a device that we define as 'Wind Turbine 1' and we will send the telemetry data from this device via MQTT: | ||
Now let us create three devices that we define as **Wind Turbine 1**, **Wind Turbine 2** and **Wind Turbine 3**. We will send the telemetry data from these devices via MQTT later. | ||
Please, make sure to set the device type to 'WeatherStation' for all three devices: | ||
|
||
![image](/images/samples/analytics/spark/wind-turbine-device.png) | ||
![image](/images/samples/analytics/spark/create-devices.png) | ||
|
||
Once added, open the 'Wind Turbine 1' device card and click on 'Copy Access Token' from this device and store it somewhere. | ||
We'll use this token later in Spark Application for sending analytics results back to ThingsBoard and will refer to it as **$GATEWAY_ACCESS_TOKEN**. | ||
Once added, for each device open the device card, click on 'Copy Access Token' button and store the token somewhere. | ||
We'll use these tokens later in Python scripts sending MQTT data to ThingsBoard. | ||
|
||
![image](/images/samples/analytics/spark/wind-turbine-device-details.png) | ||
|
||
### Step 4: Asset Configuration | ||
### Step 4: Create an Asset | ||
|
||
Now we have to create an Asset which will receive the aggregated data from Sparkk Application. | ||
Add new asset on the Assets screen: | ||
Add new Asset on the Assets screen. Please, make sure to set the Asset type to WeatherStation: | ||
|
||
![image](/images/samples/analytics/spark/create-asset.png) | ||
|
||
When asset is created, click on the asset card and copy the asset ID - we will need it in Spark Application: | ||
|
||
![image](/images/samples/analytics/spark/copy-asset-id.png) | ||
|
||
## Spark Application | ||
|
||
### Step 5. Download the sample application source code | ||
|
@@ -162,20 +163,19 @@ Dependencies that are used in the sample project: | |
|
||
### Step 7. Source code review | ||
|
||
Here is a description of particular code snippet from **SparkKafkaAssetStreamingDemoMain** class. | ||
The Spark Application logic is concentrated mainly in two classes: | ||
- **SparkKafkaStreamingDemoMain** is listening to the Kafka topic and calculates the averages | ||
- **RestClient** finds the ThingsBoard Asset by name and pushes the aggregated data to it. | ||
Below is a description of particular code snippet from **SparkKafkaStreamingDemoMain** class. | ||
Main constants are listed below: | ||
|
||
```java | ||
// Kafka brokers URL for Spark Streaming to connect and fetched messages from. | ||
private static final String KAFKA_BROKER_LIST = "localhost:9092"; | ||
// URL of ThingsBoard REST endpoint | ||
private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080"; | ||
// ThingsBoard User login | ||
private static final String USERNAME = "[email protected]"; | ||
// ThingsBoard User password | ||
private static final String PASSWORD = "tenant"; | ||
// Asset ID to post the aggregated data inot | ||
private static final String ASSET_ID = "$ASSET_ID"; | ||
// Time interval in milliseconds of Spark Streaming Job, 10 seconds by default. | ||
private static final int STREAM_WINDOW_MILLISECONDS = 10000; // 10 seconds | ||
// Kafka telemetry topic to subscribe to. This should match to the topic in the rule action. | ||
private static final Collection<String> TOPICS = Arrays.asList("weather-stations-data"); | ||
``` | ||
|
||
Main processing logic is listed below: | ||
|
@@ -184,8 +184,6 @@ Main processing logic is listed below: | |
|
||
try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STREAM_WINDOW_MILLISECONDS))) { | ||
|
||
loginRestTemplate(); | ||
|
||
JavaInputDStream<ConsumerRecord<String, String>> stream = | ||
KafkaUtils.createDirectStream( | ||
ssc, | ||
|
@@ -195,54 +193,109 @@ try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STRE | |
|
||
stream.foreachRDD(rdd -> | ||
{ | ||
// Map incoming JSON to WindSpeedData objects | ||
|
||
JavaRDD<WindSpeedData> windRdd = rdd.map(new WeatherStationDataMapper()); | ||
// Map WindSpeedData objects by GeoZone | ||
// Map incoming JSON to WindSpeedAndGeoZoneData objects | ||
JavaRDD<WindSpeedAndGeoZoneData> windRdd = rdd.map(new WeatherStationDataMapper()); | ||
// Map WindSpeedAndGeoZoneData objects by GeoZone | ||
JavaPairRDD<String, AvgWindSpeedData> windByZoneRdd = windRdd.mapToPair(d -> new Tuple2<>(d.getGeoZone(), new AvgWindSpeedData(d.getWindSpeed()))); | ||
// Reduce all data volume by GeoZone key | ||
windByZoneRdd = windByZoneRdd.reduceByKey((a, b) -> AvgWindSpeedData.sum(a, b)); | ||
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedData | ||
List<WindSpeedData> aggData = windByZoneRdd.map(t -> new WindSpeedData(t._1, t._2.getAvgValue())).collect(); | ||
// Push aggregated data to ThingsBoard using Gateway MQTT API | ||
publishTelemetryToThingsBoardAsset(aggData); | ||
// Map <GeoZone, AvgWindSpeedData> back to WindSpeedAndGeoZoneData | ||
List<WindSpeedAndGeoZoneData> aggData = windByZoneRdd.map(t -> new WindSpeedAndGeoZoneData(t._1, t._2.getAvgValue())).collect(); | ||
// Push aggregated data to ThingsBoard Asset | ||
restClient.sendTelemetryToAsset(aggData); | ||
}); | ||
|
||
ssc.start(); | ||
ssc.awaitTermination(); | ||
} | ||
``` | ||
|
||
The following method is responsbile for publishing the telemetry data: | ||
The next section describes the **RestClient** class. | ||
Main constants are listed below: | ||
|
||
```java | ||
// ThingsBoard server URL | ||
private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080"; | ||
// ThingsBoard Create Asset endpoint | ||
private static final String CREATE_ASSET_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/asset"; | ||
// ThingsBoard Get WeatherStation Assets endpoint | ||
private static final String GET_ALL_TENANT_ASSETS_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/tenant/assets?limit=100&type=WeatherStation"; | ||
// ThingsBoard Publish Asset telemetry endpoint template | ||
private static final String PUBLISH_ASSET_TELEMETRY_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/plugins/telemetry/ASSET/${ASSET_ID}/timeseries/values"; | ||
// ThingsBoard User login | ||
private static final String USERNAME = "[email protected]"; | ||
// ThingsBoard User password | ||
private static final String PASSWORD = "tenant"; | ||
``` | ||
|
||
The **getOrCreateAsset** method tries to get the Asset by name from **assetMap**. If no Asset with such name is found, it calls **createAsset** method which propagates a new Asset to ThingsBoard: | ||
|
||
```java | ||
private void publishTelemetryToThingsBoardAsset(List<WindSpeedData> aggData) throws Exception { | ||
HttpHeaders requestHeaders = new HttpHeaders(); | ||
requestHeaders.add("X-Authorization", "Bearer " + token); | ||
|
||
if (!aggData.isEmpty()) { | ||
for (WindSpeedData d : aggData) { | ||
HttpEntity<?> httpEntity = new HttpEntity<Object>(d, requestHeaders); | ||
ResponseEntity<Void> result = restTemplate.postForEntity(ASSET_PUBLISH_TELEMETRY_ENDPOINT, | ||
httpEntity, Void.class); | ||
private Asset getOrCreateAsset(String assetName) { | ||
Asset asset = assetMap.get(assetName); | ||
if (asset == null) { | ||
asset = createAsset(assetName); | ||
assetMap.put(assetName, asset); | ||
} | ||
return asset; | ||
} | ||
|
||
private Asset createAsset(String assetName) { | ||
Asset asset = new Asset(); | ||
asset.setName(assetName); | ||
asset.setType(WEATHER_STATION); | ||
HttpHeaders requestHeaders = getHttpHeaders(); | ||
HttpEntity<Asset> httpEntity = new HttpEntity<>(asset, requestHeaders); | ||
ResponseEntity<Asset> responseEntity = restTemplate.postForEntity(CREATE_ASSET_ENDPOINT, httpEntity, Asset.class); | ||
return responseEntity.getBody(); | ||
} | ||
``` | ||
|
||
The following method iterates over the aggregated data list, strips off the **geoZone** attribute and sends the aggregated **windSpeed** to the Asset: | ||
|
||
```java | ||
public void sendTelemetryToAsset(List<WindSpeedAndGeoZoneData> aggData) { | ||
if (aggData.isEmpty()) { | ||
return; | ||
} | ||
for (WindSpeedAndGeoZoneData windSpeedGeoZoneata : aggData) { | ||
String assetName = windSpeedGeoZoneata.getGeoZone(); | ||
if (StringUtils.isEmpty(assetName)) { | ||
return; | ||
} | ||
Asset asset = getOrCreateAsset(assetName); | ||
HttpHeaders requestHeaders = getHttpHeaders(); | ||
HttpEntity<?> httpEntity = new HttpEntity<Object>(new WindSpeedData(windSpeedGeoZoneata.getWindSpeed()), requestHeaders); | ||
String assetPublishEndpoint = getAssetPublishEndpoint(asset.getId().getId()); | ||
restTemplate.postForEntity(assetPublishEndpoint, | ||
httpEntity, Void.class); | ||
|
||
} | ||
} | ||
``` | ||
|
||
Now let's run **SparkKafkaAssetStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate **'Zone A Asset'** in *ThingsBoard*. | ||
Now let's run **SparkKafkaStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate Asset in *ThingsBoard*. | ||
|
||
## Dry Run | ||
|
||
Once *Kafka Plugin* is configured, *'Analytics Gateway device'* is provisioned and *Spark Streaming Application* is running please start sending **windSpeed** telemetry from different devices. | ||
|
||
The following command will provision **deviceType** and **geoZone** attributes. You may change zone to different values for different devices. | ||
The following commands will provision **deviceType** and **geoZone** attributes for the devices. | ||
Let us provision the **"geozone":"Zone A"** for **Wind Turbine 1** and **Wind Turbine 2**: | ||
|
||
```bash | ||
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_1_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' | ||
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_2_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' | ||
``` | ||
|
||
For **Wind Turbine 3** let us set **"geozone" to "Zone B"**: | ||
|
||
```bash | ||
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$YOUR_DEVICE_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' | ||
mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_3_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone B"}' | ||
``` | ||
|
||
The following [**send-randomized-windspeed.py**](/docs/samples/analytics/resources/send-randomized-windspeed.py) script will send 100 randomized windSpeed values to the device: | ||
Now let us send the telemetry for the **Wind Turbine 1**. | ||
The following [**send-randomized-windspeed-1.py**](/docs/samples/analytics/resources/send-randomized-windspeed-1.py) script will send randomized windSpeed values in the range between 30 and 35 inclusively. Just replace the **$WIND_TURBINE_1_ACCESS_TOKEN** with the actual value or set the environment variable: | ||
|
||
``` python | ||
import paho.mqtt.client as mqtt | ||
|
@@ -252,20 +305,79 @@ import random | |
broker="test.mosquitto.org" | ||
topic_pub='v1/devices/me/telemetry' | ||
|
||
|
||
client = mqtt.Client() | ||
|
||
client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN") | ||
client.username_pw_set("$WIND_TURBINE_1_ACCESS_TOKEN") | ||
client.connect('127.0.0.1', 1883, 1) | ||
|
||
for i in range(100): | ||
x = random.randrange(20, 100) | ||
while True: | ||
x = random.randrange(30, 36) | ||
print x | ||
msg = '{"windSpeed":"'+ str(x) + '"}' | ||
client.publish(topic_pub, msg) | ||
sleep(0.1) | ||
``` | ||
|
||
Once you have sent the telemetry data to ThingsBoard, wait a couple of seconds and then open up the telemetry tab on the asset: | ||
The **Zone A** Asset will start recieving the aggregated windSpeed telemetry: | ||
|
||
![image](/images/samples/analytics/spark/asset-telemetry.png) | ||
![image](/images/samples/analytics/spark/zone-a-telemetry-1.png) | ||
|
||
Let us keep this script running. Now, in a separate terminal window let's run [**send-randomized-windspeed-2.py**](/docs/samples/analytics/resources/send-randomized-windspeed-2.py) script which will start publishing telemetry data for **Wind Turbine 2**. The windSpeed for **Wind Turbine 2** will be fluctuating between 45 and 50: | ||
|
||
```python | ||
import paho.mqtt.client as mqtt | ||
from time import sleep | ||
import random | ||
|
||
broker="test.mosquitto.org" | ||
topic_pub='v1/devices/me/telemetry' | ||
|
||
client = mqtt.Client() | ||
|
||
client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN") | ||
client.connect('127.0.0.1', 1883, 1) | ||
|
||
while True: | ||
x = random.randrange(45, 51) | ||
print x | ||
msg = '{"windSpeed":"'+ str(x) + '"}' | ||
client.publish(topic_pub, msg) | ||
sleep(0.1) | ||
``` | ||
|
||
As soon as both **Wind Turbine 1** and **Wind Turbine 2** have the **geoZone** attribute set to **Zone A**, the Spark application will average the windSpeed values from both devices and push the aggregate to **Zone A** Asset. Now **Zone A** Asset will receive the aggregate windSpeed something closer to the value of 40: | ||
|
||
![image](/images/samples/analytics/spark/zone-a-telemetry-2.png) | ||
|
||
Now let us push telemetry for **Wind Turbine 3**. As you remember, it's **geoZone** is **Zone B**, and currently the asset with such name does not exist. What will happen is when the Spark application will receive the telemetry from **Wind Turbine 3**, it will detect that the target asset is missing and will propagate one to ThingsBoard. | ||
|
||
The [**send-randomized-windspeed-3.py**](/docs/samples/analytics/resources/send-randomized-windspeed-3.py) will post telemetry for **Wind Turbine 3** with windSpeed fluctuating between 30 and 60: | ||
|
||
```python | ||
import paho.mqtt.client as mqtt | ||
from time import sleep | ||
import random | ||
|
||
broker="test.mosquitto.org" | ||
topic_pub='v1/devices/me/telemetry' | ||
|
||
client = mqtt.Client() | ||
|
||
client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN") | ||
client.connect('127.0.0.1', 1883, 1) | ||
|
||
while True: | ||
x = random.randrange(30, 61) | ||
print x | ||
msg = '{"windSpeed":"'+ str(x) + '"}' | ||
client.publish(topic_pub, msg) | ||
sleep(0.1) | ||
``` | ||
|
||
Now you should see the new **Zone B** Asset propagated on Assets screen: | ||
|
||
![image](/images/samples/analytics/spark/zone-b-propagated.png) | ||
|
||
Open the **Zone B** Asset card to check out the telemetry: | ||
|
||
![image](/images/samples/analytics/spark/zone-b-telemetry.png) |
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.