diff --git a/docs/samples/analytics/resources/send-randomized-windspeed.py b/docs/samples/analytics/resources/send-randomized-windspeed-1.py similarity index 73% rename from docs/samples/analytics/resources/send-randomized-windspeed.py rename to docs/samples/analytics/resources/send-randomized-windspeed-1.py index a30b2d574b..4ccff596da 100644 --- a/docs/samples/analytics/resources/send-randomized-windspeed.py +++ b/docs/samples/analytics/resources/send-randomized-windspeed-1.py @@ -5,14 +5,13 @@ broker="test.mosquitto.org" topic_pub='v1/devices/me/telemetry' - client = mqtt.Client() -client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN") +client.username_pw_set("$WIND_TURBINE_1_ACCESS_TOKEN") client.connect('127.0.0.1', 1883, 1) -for i in range(100): - x = random.randrange(20, 100) +while True: + x = random.randrange(30, 36) print x msg = '{"windSpeed":"'+ str(x) + '"}' client.publish(topic_pub, msg) diff --git a/docs/samples/analytics/resources/send-randomized-windspeed-2.py b/docs/samples/analytics/resources/send-randomized-windspeed-2.py new file mode 100644 index 0000000000..1a44b8e911 --- /dev/null +++ b/docs/samples/analytics/resources/send-randomized-windspeed-2.py @@ -0,0 +1,18 @@ +import paho.mqtt.client as mqtt +from time import sleep +import random + +broker="test.mosquitto.org" +topic_pub='v1/devices/me/telemetry' + +client = mqtt.Client() + +client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN") +client.connect('127.0.0.1', 1883, 1) + +while True: + x = random.randrange(45, 51) + print x + msg = '{"windSpeed":"'+ str(x) + '"}' + client.publish(topic_pub, msg) + sleep(0.1) \ No newline at end of file diff --git a/docs/samples/analytics/resources/send-randomized-windspeed-3.py b/docs/samples/analytics/resources/send-randomized-windspeed-3.py new file mode 100644 index 0000000000..a515b717ba --- /dev/null +++ b/docs/samples/analytics/resources/send-randomized-windspeed-3.py @@ -0,0 +1,18 @@ +import paho.mqtt.client as mqtt +from time import sleep +import random + +broker="test.mosquitto.org" +topic_pub='v1/devices/me/telemetry' + +client = mqtt.Client() + +client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN") +client.connect('127.0.0.1', 1883, 1) + +while True: + x = random.randrange(30, 61) + print x + msg = '{"windSpeed":"'+ str(x) + '"}' + client.publish(topic_pub, msg) + sleep(0.1) \ No newline at end of file diff --git a/docs/samples/analytics/spark-integration-with-thingsboard.md b/docs/samples/analytics/spark-integration-with-thingsboard.md index c43f447d6f..eb738bf7aa 100644 --- a/docs/samples/analytics/spark-integration-with-thingsboard.md +++ b/docs/samples/analytics/spark-integration-with-thingsboard.md @@ -62,6 +62,8 @@ and use these [**instructions**](/docs/user-guide/ui/plugins/#plugin-import) to Don't forget to **activate** your new plugin instance by clicking on the corresponding button in plugin details! +![image](/images/samples/analytics/spark/kafka-plugin-activation.png) + ### Step 2. Configuration of Telemetry Forwarding Rule Now we need to configure the Rule that will be used to push wind speed data from the weather stations to Kafka. @@ -71,6 +73,8 @@ and use these [**instructions**](/docs/user-guide/ui/rules/#rule-import) to impo Don't forget to **activate** your new rule instance by clicking on the corresponding button in plugin details! +![image](/images/samples/analytics/spark/kafka-rule-activation.png) + Let's review the main rule configuration below. ##### Attributes filter @@ -103,26 +107,23 @@ Topic name in our case is **'weather-stations-data'** and the message is a valid ### Step 3. Wind Turbine Device Configuration -Let's create a device that we define as 'Wind Turbine 1' and we will send the telemetry data from this device via MQTT: +Now let us create three devices that we define as **Wind Turbine 1**, **Wind Turbine 2** and **Wind Turbine 3**. We will send the telemetry data from these devices via MQTT later. +Please, make sure to set the device type to 'WeatherStation' for all three devices: -![image](/images/samples/analytics/spark/wind-turbine-device.png) +![image](/images/samples/analytics/spark/create-devices.png) -Once added, open the 'Wind Turbine 1' device card and click on 'Copy Access Token' from this device and store it somewhere. -We'll use this token later in Spark Application for sending analytics results back to ThingsBoard and will refer to it as **$GATEWAY_ACCESS_TOKEN**. +Once added, for each device open the device card, click on 'Copy Access Token' button and store the token somewhere. +We'll use these tokens later in Python scripts sending MQTT data to ThingsBoard. ![image](/images/samples/analytics/spark/wind-turbine-device-details.png) -### Step 4: Asset Configuration +### Step 4: Create an Asset Now we have to create an Asset which will receive the aggregated data from Sparkk Application. -Add new asset on the Assets screen: +Add new Asset on the Assets screen. Please, make sure to set the Asset type to WeatherStation: ![image](/images/samples/analytics/spark/create-asset.png) -When asset is created, click on the asset card and copy the asset ID - we will need it in Spark Application: - -![image](/images/samples/analytics/spark/copy-asset-id.png) - ## Spark Application ### Step 5. Download the sample application source code @@ -162,20 +163,19 @@ Dependencies that are used in the sample project: ### Step 7. Source code review -Here is a description of particular code snippet from **SparkKafkaAssetStreamingDemoMain** class. +The Spark Application logic is concentrated mainly in two classes: + - **SparkKafkaStreamingDemoMain** is listening to the Kafka topic and calculates the averages + - **RestClient** finds the ThingsBoard Asset by name and pushes the aggregated data to it. +Below is a description of particular code snippet from **SparkKafkaStreamingDemoMain** class. Main constants are listed below: ```java // Kafka brokers URL for Spark Streaming to connect and fetched messages from. private static final String KAFKA_BROKER_LIST = "localhost:9092"; -// URL of ThingsBoard REST endpoint -private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080"; -// ThingsBoard User login -private static final String USERNAME = "tenant@thingsboard.org"; -// ThingsBoard User password -private static final String PASSWORD = "tenant"; -// Asset ID to post the aggregated data inot -private static final String ASSET_ID = "$ASSET_ID"; +// Time interval in milliseconds of Spark Streaming Job, 10 seconds by default. +private static final int STREAM_WINDOW_MILLISECONDS = 10000; // 10 seconds +// Kafka telemetry topic to subscribe to. This should match to the topic in the rule action. +private static final Collection TOPICS = Arrays.asList("weather-stations-data"); ``` Main processing logic is listed below: @@ -184,8 +184,6 @@ Main processing logic is listed below: try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STREAM_WINDOW_MILLISECONDS))) { - loginRestTemplate(); - JavaInputDStream> stream = KafkaUtils.createDirectStream( ssc, @@ -195,17 +193,16 @@ try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STRE stream.foreachRDD(rdd -> { - // Map incoming JSON to WindSpeedData objects - - JavaRDD windRdd = rdd.map(new WeatherStationDataMapper()); - // Map WindSpeedData objects by GeoZone + // Map incoming JSON to WindSpeedAndGeoZoneData objects + JavaRDD windRdd = rdd.map(new WeatherStationDataMapper()); + // Map WindSpeedAndGeoZoneData objects by GeoZone JavaPairRDD windByZoneRdd = windRdd.mapToPair(d -> new Tuple2<>(d.getGeoZone(), new AvgWindSpeedData(d.getWindSpeed()))); // Reduce all data volume by GeoZone key windByZoneRdd = windByZoneRdd.reduceByKey((a, b) -> AvgWindSpeedData.sum(a, b)); - // Map back to WindSpeedData - List aggData = windByZoneRdd.map(t -> new WindSpeedData(t._1, t._2.getAvgValue())).collect(); - // Push aggregated data to ThingsBoard using Gateway MQTT API - publishTelemetryToThingsBoardAsset(aggData); + // Map back to WindSpeedAndGeoZoneData + List aggData = windByZoneRdd.map(t -> new WindSpeedAndGeoZoneData(t._1, t._2.getAvgValue())).collect(); + // Push aggregated data to ThingsBoard Asset + restClient.sendTelemetryToAsset(aggData); }); ssc.start(); @@ -213,36 +210,92 @@ try (JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(STRE } ``` -The following method is responsbile for publishing the telemetry data: +The next section describes the **RestClient** class. +Main constants are listed below: + +```java +// ThingsBoard server URL +private static final String THINGSBOARD_REST_ENDPOINT = "http://localhost:8080"; +// ThingsBoard Create Asset endpoint +private static final String CREATE_ASSET_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/asset"; +// ThingsBoard Get WeatherStation Assets endpoint +private static final String GET_ALL_TENANT_ASSETS_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/tenant/assets?limit=100&type=WeatherStation"; +// ThingsBoard Publish Asset telemetry endpoint template +private static final String PUBLISH_ASSET_TELEMETRY_ENDPOINT = THINGSBOARD_REST_ENDPOINT + "/api/plugins/telemetry/ASSET/${ASSET_ID}/timeseries/values"; +// ThingsBoard User login +private static final String USERNAME = "tenant@thingsboard.org"; +// ThingsBoard User password +private static final String PASSWORD = "tenant"; +``` + +The **getOrCreateAsset** method tries to get the Asset by name from **assetMap**. If no Asset with such name is found, it calls **createAsset** method which propagates a new Asset to ThingsBoard: ```java -private void publishTelemetryToThingsBoardAsset(List aggData) throws Exception { - HttpHeaders requestHeaders = new HttpHeaders(); - requestHeaders.add("X-Authorization", "Bearer " + token); - - if (!aggData.isEmpty()) { - for (WindSpeedData d : aggData) { - HttpEntity httpEntity = new HttpEntity(d, requestHeaders); - ResponseEntity result = restTemplate.postForEntity(ASSET_PUBLISH_TELEMETRY_ENDPOINT, - httpEntity, Void.class); +private Asset getOrCreateAsset(String assetName) { + Asset asset = assetMap.get(assetName); + if (asset == null) { + asset = createAsset(assetName); + assetMap.put(assetName, asset); + } + return asset; +} + +private Asset createAsset(String assetName) { + Asset asset = new Asset(); + asset.setName(assetName); + asset.setType(WEATHER_STATION); + HttpHeaders requestHeaders = getHttpHeaders(); + HttpEntity httpEntity = new HttpEntity<>(asset, requestHeaders); + ResponseEntity responseEntity = restTemplate.postForEntity(CREATE_ASSET_ENDPOINT, httpEntity, Asset.class); + return responseEntity.getBody(); +} +``` + +The following method iterates over the aggregated data list, strips off the **geoZone** attribute and sends the aggregated **windSpeed** to the Asset: + +```java +public void sendTelemetryToAsset(List aggData) { + if (aggData.isEmpty()) { + return; + } + for (WindSpeedAndGeoZoneData windSpeedGeoZoneata : aggData) { + String assetName = windSpeedGeoZoneata.getGeoZone(); + if (StringUtils.isEmpty(assetName)) { + return; } + Asset asset = getOrCreateAsset(assetName); + HttpHeaders requestHeaders = getHttpHeaders(); + HttpEntity httpEntity = new HttpEntity(new WindSpeedData(windSpeedGeoZoneata.getWindSpeed()), requestHeaders); + String assetPublishEndpoint = getAssetPublishEndpoint(asset.getId().getId()); + restTemplate.postForEntity(assetPublishEndpoint, + httpEntity, Void.class); + } } ``` -Now let's run **SparkKafkaAssetStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate **'Zone A Asset'** in *ThingsBoard*. +Now let's run **SparkKafkaStreamingDemoMain** class from the IDE or submit it to a Spark cluster. Sample app will be fetching all the messages from Kafka topic and send average wind speed telemetry to the appropriate Asset in *ThingsBoard*. ## Dry Run Once *Kafka Plugin* is configured, *'Analytics Gateway device'* is provisioned and *Spark Streaming Application* is running please start sending **windSpeed** telemetry from different devices. -The following command will provision **deviceType** and **geoZone** attributes. You may change zone to different values for different devices. +The following commands will provision **deviceType** and **geoZone** attributes for the devices. +Let us provision the **"geozone":"Zone A"** for **Wind Turbine 1** and **Wind Turbine 2**: + +```bash +mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_1_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' +mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_2_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' +``` + +For **Wind Turbine 3** let us set **"geozone" to "Zone B"**: ```bash -mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$YOUR_DEVICE_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone A"}' +mosquitto_pub -d -h "localhost" -p 1883 -t "v1/devices/me/attributes" -u "$WIND_TURBINE_3_ACCESS_TOKEN" -m '{"deviceType":"WeatherStation", "geoZone":"Zone B"}' ``` -The following [**send-randomized-windspeed.py**](/docs/samples/analytics/resources/send-randomized-windspeed.py) script will send 100 randomized windSpeed values to the device: +Now let us send the telemetry for the **Wind Turbine 1**. +The following [**send-randomized-windspeed-1.py**](/docs/samples/analytics/resources/send-randomized-windspeed-1.py) script will send randomized windSpeed values in the range between 30 and 35 inclusively. Just replace the **$WIND_TURBINE_1_ACCESS_TOKEN** with the actual value or set the environment variable: ``` python import paho.mqtt.client as mqtt @@ -252,20 +305,79 @@ import random broker="test.mosquitto.org" topic_pub='v1/devices/me/telemetry' - client = mqtt.Client() -client.username_pw_set("$YOUR_DEVICE_ACCESS_TOKEN") +client.username_pw_set("$WIND_TURBINE_1_ACCESS_TOKEN") client.connect('127.0.0.1', 1883, 1) -for i in range(100): - x = random.randrange(20, 100) +while True: + x = random.randrange(30, 36) print x msg = '{"windSpeed":"'+ str(x) + '"}' client.publish(topic_pub, msg) sleep(0.1) ``` -Once you have sent the telemetry data to ThingsBoard, wait a couple of seconds and then open up the telemetry tab on the asset: +The **Zone A** Asset will start recieving the aggregated windSpeed telemetry: -![image](/images/samples/analytics/spark/asset-telemetry.png) +![image](/images/samples/analytics/spark/zone-a-telemetry-1.png) + +Let us keep this script running. Now, in a separate terminal window let's run [**send-randomized-windspeed-2.py**](/docs/samples/analytics/resources/send-randomized-windspeed-2.py) script which will start publishing telemetry data for **Wind Turbine 2**. The windSpeed for **Wind Turbine 2** will be fluctuating between 45 and 50: + +```python +import paho.mqtt.client as mqtt +from time import sleep +import random + +broker="test.mosquitto.org" +topic_pub='v1/devices/me/telemetry' + +client = mqtt.Client() + +client.username_pw_set("$WIND_TURBINE_2_ACCESS_TOKEN") +client.connect('127.0.0.1', 1883, 1) + +while True: + x = random.randrange(45, 51) + print x + msg = '{"windSpeed":"'+ str(x) + '"}' + client.publish(topic_pub, msg) + sleep(0.1) +``` + +As soon as both **Wind Turbine 1** and **Wind Turbine 2** have the **geoZone** attribute set to **Zone A**, the Spark application will average the windSpeed values from both devices and push the aggregate to **Zone A** Asset. Now **Zone A** Asset will receive the aggregate windSpeed something closer to the value of 40: + +![image](/images/samples/analytics/spark/zone-a-telemetry-2.png) + +Now let us push telemetry for **Wind Turbine 3**. As you remember, it's **geoZone** is **Zone B**, and currently the asset with such name does not exist. What will happen is when the Spark application will receive the telemetry from **Wind Turbine 3**, it will detect that the target asset is missing and will propagate one to ThingsBoard. + +The [**send-randomized-windspeed-3.py**](/docs/samples/analytics/resources/send-randomized-windspeed-3.py) will post telemetry for **Wind Turbine 3** with windSpeed fluctuating between 30 and 60: + +```python +import paho.mqtt.client as mqtt +from time import sleep +import random + +broker="test.mosquitto.org" +topic_pub='v1/devices/me/telemetry' + +client = mqtt.Client() + +client.username_pw_set("$WIND_TURBINE_3_ACCESS_TOKEN") +client.connect('127.0.0.1', 1883, 1) + +while True: + x = random.randrange(30, 61) + print x + msg = '{"windSpeed":"'+ str(x) + '"}' + client.publish(topic_pub, msg) + sleep(0.1) +``` + +Now you should see the new **Zone B** Asset propagated on Assets screen: + +![image](/images/samples/analytics/spark/zone-b-propagated.png) + +Open the **Zone B** Asset card to check out the telemetry: + +![image](/images/samples/analytics/spark/zone-b-telemetry.png) \ No newline at end of file diff --git a/images/samples/analytics/spark/asset-telemetry.png b/images/samples/analytics/spark/asset-telemetry.png deleted file mode 100644 index 9d72db7c03..0000000000 Binary files a/images/samples/analytics/spark/asset-telemetry.png and /dev/null differ diff --git a/images/samples/analytics/spark/copy-asset-id.png b/images/samples/analytics/spark/copy-asset-id.png deleted file mode 100644 index abb9e731db..0000000000 Binary files a/images/samples/analytics/spark/copy-asset-id.png and /dev/null differ diff --git a/images/samples/analytics/spark/create-devices.png b/images/samples/analytics/spark/create-devices.png new file mode 100644 index 0000000000..45627cb2c4 Binary files /dev/null and b/images/samples/analytics/spark/create-devices.png differ diff --git a/images/samples/analytics/spark/kafka-temperature-filter.png b/images/samples/analytics/spark/kafka-temperature-filter.png deleted file mode 100644 index 1b1cb9b0e3..0000000000 Binary files a/images/samples/analytics/spark/kafka-temperature-filter.png and /dev/null differ diff --git a/images/samples/analytics/spark/wind-turbine-device.png b/images/samples/analytics/spark/wind-turbine-device.png deleted file mode 100644 index 0c48b546c9..0000000000 Binary files a/images/samples/analytics/spark/wind-turbine-device.png and /dev/null differ diff --git a/images/samples/analytics/spark/zone-a-telemetry-1.png b/images/samples/analytics/spark/zone-a-telemetry-1.png new file mode 100644 index 0000000000..ac7e58173e Binary files /dev/null and b/images/samples/analytics/spark/zone-a-telemetry-1.png differ diff --git a/images/samples/analytics/spark/zone-a-telemetry-2.png b/images/samples/analytics/spark/zone-a-telemetry-2.png new file mode 100644 index 0000000000..05c9b7c2a2 Binary files /dev/null and b/images/samples/analytics/spark/zone-a-telemetry-2.png differ diff --git a/images/samples/analytics/spark/zone-b-propagated.png b/images/samples/analytics/spark/zone-b-propagated.png new file mode 100644 index 0000000000..846cfba0ea Binary files /dev/null and b/images/samples/analytics/spark/zone-b-propagated.png differ diff --git a/images/samples/analytics/spark/zone-b-telemetry.png b/images/samples/analytics/spark/zone-b-telemetry.png new file mode 100644 index 0000000000..d5777879d2 Binary files /dev/null and b/images/samples/analytics/spark/zone-b-telemetry.png differ diff --git a/images/samples/analytics/spark/zone-device-telemetry.png b/images/samples/analytics/spark/zone-device-telemetry.png deleted file mode 100644 index 4e7eb78fff..0000000000 Binary files a/images/samples/analytics/spark/zone-device-telemetry.png and /dev/null differ