Skip to content

Commit

Permalink
Send processing statistics via MQTT (#173)
Browse files Browse the repository at this point in the history
Fixes #146
  • Loading branch information
neilenns authored Jun 11, 2020
1 parent dffddd2 commit 3cb1e08
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- MQTT status messages with statistics are now sent on every received file. The total
number of files received and the number of triggers actually fired are included in
the message payload. Resolves [issue 146](https://github.com/danecreekphotography/node-deepstackai-trigger/issues/146).
- Fix typos in the source code. Resolves [issue 170](https://github.com/danecreekphotography/node-deepstackai-trigger/issues/170).
- Add a clear message after initialization indicating whether startup was successful.
If it wasn't there's now a link to a troubleshooting wiki page for assistance. Resolves
Expand Down
10 changes: 9 additions & 1 deletion src/Trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import * as chokidar from "chokidar";
import * as JSONC from "jsonc-parser";
import * as log from "./Log";
import * as MqttManager from "./handlers/mqttManager/MqttManager";
import * as triggerManager from "./TriggerManager";
import * as TelegramManager from "./handlers/telegramManager/TelegramManager";
import * as WebRequestHandler from "./handlers/webRequest/WebRequestHandler";
import analyzeImage from "./DeepStack";
Expand Down Expand Up @@ -80,6 +81,7 @@ export default class Trigger {
* @param stats The stats for the file
*/
public async processImage(fileName: string, stats: Stats): Promise<void> {
triggerManager.incrementAnalyzedFilesCount();
// Don't process old files
if (!this.passesDateTest(fileName, stats)) return;

Expand All @@ -91,12 +93,18 @@ export default class Trigger {

// Check to see if any predictions cause this to activate
const triggeredPredictions = this.getTriggeredPredictions(fileName, predictions);
if (!triggeredPredictions) return;
if (!triggeredPredictions) {
MqttManager.publishStatisticsMessage(triggerManager.triggeredCount, triggerManager.analyzedFilesCount);
return;
}

triggerManager.incrementTriggeredCount();

// Call all the handlers for the trigger
await Promise.all([
...(await WebRequestHandler.processTrigger(fileName, this, triggeredPredictions)),
...(await MqttManager.processTrigger(fileName, this, triggeredPredictions)),
...(await MqttManager.publishStatisticsMessage(triggerManager.triggeredCount, triggerManager.analyzedFilesCount)),
...(await TelegramManager.processTrigger(fileName, this, triggeredPredictions)),
]);
}
Expand Down
26 changes: 26 additions & 0 deletions src/TriggerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ import WebRequestConfig from "./handlers/webRequest/WebRequestConfig";
import * as fs from "fs";
import Rect from "./Rect";

/**
* Provides a running total of the number of times an image caused triggers
* to fire. Use the incrementTriggeredCount() method to update the total.
*/
export let triggeredCount = 0;

/**
* Provides a running total of the number of files analyzed.
* Use the incrementAnalyzedFiles() method to update the total.
*/
export let analyzedFilesCount = 0;

let _triggers: Trigger[];

/**
Expand Down Expand Up @@ -148,3 +160,17 @@ export function startWatching(): void {
export async function stopWatching(): Promise<void[]> {
return Promise.all(_triggers.map(trigger => trigger.stopWatching()));
}

/**
* Adds one to the triggered count total.
*/
export function incrementTriggeredCount(): void {
triggeredCount += 1;
}

/**
* Adds one to the false positive count total.
*/
export function incrementAnalyzedFilesCount(): void {
analyzedFilesCount += 1;
}
34 changes: 30 additions & 4 deletions src/handlers/mqttManager/MqttManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,12 @@ export async function processTrigger(

return Promise.all(
trigger.mqttHandlerConfig?.messages.map(message => {
return publishMessage(fileName, trigger, message, predictions);
return publishDetectionMessage(fileName, trigger, message, predictions);
}),
);
}

async function publishMessage(
async function publishDetectionMessage(
fileName: string,
trigger: Trigger,
messageConfig: MqttMessageConfig,
Expand All @@ -141,7 +141,8 @@ async function publishMessage(
timers.set(messageConfig.topic, setTimeout(publishOffEvent, messageConfig.offDelay * 1000, messageConfig.topic));
}

const payload = messageConfig.payload
// Build the detection payload
const detectionPayload = messageConfig.payload
? mustacheFormatter.format(messageConfig.payload, fileName, trigger, predictions)
: JSON.stringify({
fileName,
Expand All @@ -150,7 +151,32 @@ async function publishMessage(
state: "on",
});

return await mqttClient.publish(messageConfig.topic, payload);
return await mqttClient.publish(messageConfig.topic, detectionPayload);
}

/**
* Publishes statistics to MQTT
* @param triggerCount Trigger count
* @param analyzedFilesCount False positive count
*/
export async function publishStatisticsMessage(
triggerCount: number,
analyzedFilesCount: number,
): Promise<MQTT.IPublishPacket[]> {
// Don't send anything if MQTT isn't enabled
if (!mqttClient) {
return [];
}

return [
await mqttClient.publish(
statusTopic,
JSON.stringify({
triggerCount,
analyzedFilesCount,
}),
),
];
}

async function publishOffEvent(topic: string): Promise<IPublishPacket> {
Expand Down

0 comments on commit 3cb1e08

Please sign in to comment.