From 3cb1e08fa52129282361865f5db7fce62e04a7cf Mon Sep 17 00:00:00 2001 From: danecreekphotography Date: Thu, 11 Jun 2020 06:09:54 -0700 Subject: [PATCH] Send processing statistics via MQTT (#173) Fixes #146 --- CHANGELOG.md | 3 +++ src/Trigger.ts | 10 +++++++- src/TriggerManager.ts | 26 +++++++++++++++++++ src/handlers/mqttManager/MqttManager.ts | 34 ++++++++++++++++++++++--- 4 files changed, 68 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ec8d57..312f5be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- MQTT status messages with statistics are now sent on every received file. The total + number of files received and the number of triggers actually fired are included in + the message payload. Resolves [issue 146](https://github.com/danecreekphotography/node-deepstackai-trigger/issues/146). - Fix typos in the source code. Resolves [issue 170](https://github.com/danecreekphotography/node-deepstackai-trigger/issues/170). - Add a clear message after initialization indicating whether startup was successful. If it wasn't there's now a link to a troubleshooting wiki page for assistance. Resolves diff --git a/src/Trigger.ts b/src/Trigger.ts index f5585ef..400894b 100644 --- a/src/Trigger.ts +++ b/src/Trigger.ts @@ -6,6 +6,7 @@ import * as chokidar from "chokidar"; import * as JSONC from "jsonc-parser"; import * as log from "./Log"; import * as MqttManager from "./handlers/mqttManager/MqttManager"; +import * as triggerManager from "./TriggerManager"; import * as TelegramManager from "./handlers/telegramManager/TelegramManager"; import * as WebRequestHandler from "./handlers/webRequest/WebRequestHandler"; import analyzeImage from "./DeepStack"; @@ -80,6 +81,7 @@ export default class Trigger { * @param stats The stats for the file */ public async processImage(fileName: string, stats: Stats): Promise { + triggerManager.incrementAnalyzedFilesCount(); // Don't process old files if (!this.passesDateTest(fileName, stats)) return; @@ -91,12 +93,18 @@ export default class Trigger { // Check to see if any predictions cause this to activate const triggeredPredictions = this.getTriggeredPredictions(fileName, predictions); - if (!triggeredPredictions) return; + if (!triggeredPredictions) { + MqttManager.publishStatisticsMessage(triggerManager.triggeredCount, triggerManager.analyzedFilesCount); + return; + } + + triggerManager.incrementTriggeredCount(); // Call all the handlers for the trigger await Promise.all([ ...(await WebRequestHandler.processTrigger(fileName, this, triggeredPredictions)), ...(await MqttManager.processTrigger(fileName, this, triggeredPredictions)), + ...(await MqttManager.publishStatisticsMessage(triggerManager.triggeredCount, triggerManager.analyzedFilesCount)), ...(await TelegramManager.processTrigger(fileName, this, triggeredPredictions)), ]); } diff --git a/src/TriggerManager.ts b/src/TriggerManager.ts index c21ed80..897dbd5 100644 --- a/src/TriggerManager.ts +++ b/src/TriggerManager.ts @@ -14,6 +14,18 @@ import WebRequestConfig from "./handlers/webRequest/WebRequestConfig"; import * as fs from "fs"; import Rect from "./Rect"; +/** + * Provides a running total of the number of times an image caused triggers + * to fire. Use the incrementTriggeredCount() method to update the total. + */ +export let triggeredCount = 0; + +/** + * Provides a running total of the number of files analyzed. + * Use the incrementAnalyzedFiles() method to update the total. + */ +export let analyzedFilesCount = 0; + let _triggers: Trigger[]; /** @@ -148,3 +160,17 @@ export function startWatching(): void { export async function stopWatching(): Promise { return Promise.all(_triggers.map(trigger => trigger.stopWatching())); } + +/** + * Adds one to the triggered count total. + */ +export function incrementTriggeredCount(): void { + triggeredCount += 1; +} + +/** + * Adds one to the false positive count total. + */ +export function incrementAnalyzedFilesCount(): void { + analyzedFilesCount += 1; +} diff --git a/src/handlers/mqttManager/MqttManager.ts b/src/handlers/mqttManager/MqttManager.ts index a3a9348..f3f193d 100644 --- a/src/handlers/mqttManager/MqttManager.ts +++ b/src/handlers/mqttManager/MqttManager.ts @@ -115,12 +115,12 @@ export async function processTrigger( return Promise.all( trigger.mqttHandlerConfig?.messages.map(message => { - return publishMessage(fileName, trigger, message, predictions); + return publishDetectionMessage(fileName, trigger, message, predictions); }), ); } -async function publishMessage( +async function publishDetectionMessage( fileName: string, trigger: Trigger, messageConfig: MqttMessageConfig, @@ -141,7 +141,8 @@ async function publishMessage( timers.set(messageConfig.topic, setTimeout(publishOffEvent, messageConfig.offDelay * 1000, messageConfig.topic)); } - const payload = messageConfig.payload + // Build the detection payload + const detectionPayload = messageConfig.payload ? mustacheFormatter.format(messageConfig.payload, fileName, trigger, predictions) : JSON.stringify({ fileName, @@ -150,7 +151,32 @@ async function publishMessage( state: "on", }); - return await mqttClient.publish(messageConfig.topic, payload); + return await mqttClient.publish(messageConfig.topic, detectionPayload); +} + +/** + * Publishes statistics to MQTT + * @param triggerCount Trigger count + * @param analyzedFilesCount False positive count + */ +export async function publishStatisticsMessage( + triggerCount: number, + analyzedFilesCount: number, +): Promise { + // Don't send anything if MQTT isn't enabled + if (!mqttClient) { + return []; + } + + return [ + await mqttClient.publish( + statusTopic, + JSON.stringify({ + triggerCount, + analyzedFilesCount, + }), + ), + ]; } async function publishOffEvent(topic: string): Promise {