Skip to content

Commit

Permalink
Feat: Add json-query to MQTT monitor type (#3857)
Browse files Browse the repository at this point in the history
* Feat: Add json-query MQTT monitor type

* Fix: Allow result to be null

* Fix: Remove unused parameter

* Chore: Update JSDoc

* Fix: Add default if checkType is not set

---------

Co-authored-by: Louis Lam <[email protected]>
  • Loading branch information
chakflying and louislam authored Dec 2, 2023
1 parent 35479c7 commit 4643261
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 82 deletions.
16 changes: 16 additions & 0 deletions db/knex_migrations/2023-10-08-0000-mqtt-query.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
exports.up = function (knex) {
// Add new column monitor.mqtt_check_type
return knex.schema
.alterTable("monitor", function (table) {
table.string("mqtt_check_type", 255).notNullable().defaultTo("keyword");
});

};

exports.down = function (knex) {
// Drop column monitor.mqtt_check_type
return knex.schema
.alterTable("monitor", function (table) {
table.dropColumn("mqtt_check_type");
});
};
11 changes: 2 additions & 9 deletions server/model/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { Prometheus } = require("../prometheus");
const { log, UP, DOWN, PENDING, MAINTENANCE, flipStatus, MAX_INTERVAL_SECOND, MIN_INTERVAL_SECOND,
SQL_DATETIME_FORMAT
} = require("../../src/util");
const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, mqttAsync, setSetting, httpNtlm, radius, grpcQuery,
const { tcping, ping, checkCertificate, checkStatusCode, getTotalClientInRoom, setting, mssqlQuery, postgresQuery, mysqlQuery, setSetting, httpNtlm, radius, grpcQuery,
redisPingAsync, mongodbPing, kafkaProducerAsync, getOidcTokenClientCredentials, rootCertificatesFingerprints, axiosAbortSignal
} = require("../util-server");
const { R } = require("redbean-node");
Expand Down Expand Up @@ -134,6 +134,7 @@ class Monitor extends BeanModel {
maintenance: await Monitor.isUnderMaintenance(this.id),
mqttTopic: this.mqttTopic,
mqttSuccessMessage: this.mqttSuccessMessage,
mqttCheckType: this.mqttCheckType,
databaseQuery: this.databaseQuery,
authMethod: this.authMethod,
grpcUrl: this.grpcUrl,
Expand Down Expand Up @@ -757,14 +758,6 @@ class Monitor extends BeanModel {
} else {
throw Error("Container State is " + res.data.State.Status);
}
} else if (this.type === "mqtt") {
bean.msg = await mqttAsync(this.hostname, this.mqttTopic, this.mqttSuccessMessage, {
port: this.port,
username: this.mqttUsername,
password: this.mqttPassword,
interval: this.interval,
});
bean.status = UP;
} else if (this.type === "sqlserver") {
let startTime = dayjs().valueOf();

Expand Down
121 changes: 121 additions & 0 deletions server/monitor-types/mqtt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
const { MonitorType } = require("./monitor-type");
const { log, UP } = require("../../src/util");
const mqtt = require("mqtt");
const jsonata = require("jsonata");

class MqttMonitorType extends MonitorType {

name = "mqtt";

/**
* Run the monitoring check on the MQTT monitor
* @param {Monitor} monitor Monitor to check
* @param {Heartbeat} heartbeat Monitor heartbeat to update
* @param {UptimeKumaServer} server Uptime Kuma server
* @returns {Promise<void>}
*/
async check(monitor, heartbeat, server) {
const receivedMessage = await this.mqttAsync(monitor.hostname, monitor.mqttTopic, {
port: monitor.port,
username: monitor.mqttUsername,
password: monitor.mqttPassword,
interval: monitor.interval,
});

if (monitor.mqttCheckType == null || monitor.mqttCheckType === "") {
// use old default
monitor.mqttCheckType = "keyword";
}

if (monitor.mqttCheckType === "keyword") {
if (receivedMessage != null && receivedMessage.includes(monitor.mqttSuccessMessage)) {
heartbeat.msg = `Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`;
heartbeat.status = UP;
} else {
throw Error(`Message Mismatch - Topic: ${monitor.mqttTopic}; Message: ${receivedMessage}`);
}
} else if (monitor.mqttCheckType === "json-query") {
const parsedMessage = JSON.parse(receivedMessage);

let expression = jsonata(monitor.jsonPath);

let result = await expression.evaluate(parsedMessage);

if (result?.toString() === monitor.expectedValue) {
heartbeat.msg = "Message received, expected value is found";
heartbeat.status = UP;
} else {
throw new Error("Message received but value is not equal to expected value, value was: [" + result + "]");
}
} else {
throw Error("Unknown MQTT Check Type");
}
}

/**
* Connect to MQTT Broker, subscribe to topic and receive message as String
* @param {string} hostname Hostname / address of machine to test
* @param {string} topic MQTT topic
* @param {object} options MQTT options. Contains port, username,
* password and interval (interval defaults to 20)
* @returns {Promise<string>} Received MQTT message
*/
mqttAsync(hostname, topic, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;

// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}

const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout, Message not received"));
}, interval * 1000 * 0.8);

const mqttUrl = `${hostname}:${port}`;

log.debug("mqtt", `MQTT connecting to ${mqttUrl}`);

let client = mqtt.connect(mqttUrl, {
username,
password
});

client.on("connect", () => {
log.debug("mqtt", "MQTT connected");

try {
client.subscribe(topic, () => {
log.debug("mqtt", "MQTT subscribed to topic");
});
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});

client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});

client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
resolve(message.toString("utf8"));
}
});

});
}
}

module.exports = {
MqttMonitorType,
};
1 change: 1 addition & 0 deletions server/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ let needSetup = false;
bean.mqttPassword = monitor.mqttPassword;
bean.mqttTopic = monitor.mqttTopic;
bean.mqttSuccessMessage = monitor.mqttSuccessMessage;
bean.mqttCheckType = monitor.mqttCheckType;
bean.databaseConnectionString = monitor.databaseConnectionString;
bean.databaseQuery = monitor.databaseQuery;
bean.authMethod = monitor.authMethod;
Expand Down
2 changes: 2 additions & 0 deletions server/uptime-kuma-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class UptimeKumaServer {
UptimeKumaServer.monitorTypeList["real-browser"] = new RealBrowserMonitorType();
UptimeKumaServer.monitorTypeList["tailscale-ping"] = new TailscalePing();
UptimeKumaServer.monitorTypeList["dns"] = new DnsMonitorType();
UptimeKumaServer.monitorTypeList["mqtt"] = new MqttMonitorType();

this.io = new Server(this.httpServer);
}
Expand Down Expand Up @@ -436,3 +437,4 @@ module.exports = {
const { RealBrowserMonitorType } = require("./monitor-types/real-browser-monitor-type");
const { TailscalePing } = require("./monitor-types/tailscale-ping");
const { DnsMonitorType } = require("./monitor-types/dns");
const { MqttMonitorType } = require("./monitor-types/mqtt");
68 changes: 0 additions & 68 deletions server/util-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const { Resolver } = require("dns");
const childProcess = require("child_process");
const iconv = require("iconv-lite");
const chardet = require("chardet");
const mqtt = require("mqtt");
const chroma = require("chroma-js");
const { badgeConstants } = require("./config");
const mssql = require("mssql");
Expand Down Expand Up @@ -173,73 +172,6 @@ exports.pingAsync = function (hostname, ipv6 = false, size = 56) {
});
};

/**
* MQTT Monitor
* @param {string} hostname Hostname / address of machine to test
* @param {string} topic MQTT topic
* @param {string} okMessage Expected result
* @param {object} options MQTT options. Contains port, username,
* password and interval (interval defaults to 20)
* @returns {Promise<string>} Received MQTT message
*/
exports.mqttAsync = function (hostname, topic, okMessage, options = {}) {
return new Promise((resolve, reject) => {
const { port, username, password, interval = 20 } = options;

// Adds MQTT protocol to the hostname if not already present
if (!/^(?:http|mqtt|ws)s?:\/\//.test(hostname)) {
hostname = "mqtt://" + hostname;
}

const timeoutID = setTimeout(() => {
log.debug("mqtt", "MQTT timeout triggered");
client.end();
reject(new Error("Timeout"));
}, interval * 1000 * 0.8);

const mqttUrl = `${hostname}:${port}`;

log.debug("mqtt", `MQTT connecting to ${mqttUrl}`);

let client = mqtt.connect(mqttUrl, {
username,
password
});

client.on("connect", () => {
log.debug("mqtt", "MQTT connected");

try {
log.debug("mqtt", "MQTT subscribe topic");
client.subscribe(topic);
} catch (e) {
client.end();
clearTimeout(timeoutID);
reject(new Error("Cannot subscribe topic"));
}
});

client.on("error", (error) => {
client.end();
clearTimeout(timeoutID);
reject(error);
});

client.on("message", (messageTopic, message) => {
if (messageTopic === topic) {
client.end();
clearTimeout(timeoutID);
if (okMessage != null && okMessage !== "" && message.toString() !== okMessage) {
reject(new Error(`Message Mismatch - Topic: ${messageTopic}; Message: ${message.toString()}`));
} else {
resolve(`Topic: ${messageTopic}; Message: ${message.toString()}`);
}
}
});

});
};

/**
* Monitor Kafka using Producer
* @param {string[]} brokers List of kafka brokers to connect, host and
Expand Down
4 changes: 2 additions & 2 deletions src/lang/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@
"Current User": "Current User",
"topic": "Topic",
"topicExplanation": "MQTT topic to monitor",
"successMessage": "Success Message",
"successMessageExplanation": "MQTT message that will be considered as success",
"successKeyword": "Success Keyword",
"successKeywordExplanation": "MQTT Keyword that will be considered as success",
"recent": "Recent",
"Reset Token": "Reset Token",
"Done": "Done",
Expand Down
29 changes: 26 additions & 3 deletions src/pages/EditMonitor.vue
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,33 @@
</div>

<div class="my-3">
<label for="mqttSuccessMessage" class="form-label">MQTT {{ $t("successMessage") }}</label>
<input id="mqttSuccessMessage" v-model="monitor.mqttSuccessMessage" type="text" class="form-control">
<label for="mqttCheckType" class="form-label">MQTT {{ $t("Check Type") }}</label>
<select id="mqttCheckType" v-model="monitor.mqttCheckType" class="form-select" required>
<option value="keyword">{{ $t("Keyword") }}</option>
<option value="json-query">{{ $t("Json Query") }}</option>
</select>
</div>

<div v-if="monitor.mqttCheckType === 'keyword'" class="my-3">
<label for="mqttSuccessKeyword" class="form-label">MQTT {{ $t("successKeyword") }}</label>
<input id="mqttSuccessKeyword" v-model="monitor.mqttSuccessMessage" type="text" class="form-control">
<div class="form-text">
{{ $t("successMessageExplanation") }}
{{ $t("successKeywordExplanation") }}
</div>
</div>

<!-- Json Query -->
<div v-if="monitor.mqttCheckType === 'json-query'" class="my-3">
<label for="jsonPath" class="form-label">{{ $t("Json Query") }}</label>
<input id="jsonPath" v-model="monitor.jsonPath" type="text" class="form-control" required>

<!-- eslint-disable-next-line vue/no-v-html -->
<div class="form-text" v-html="$t('jsonQueryDescription')">
</div>
<br>

<label for="expectedValue" class="form-label">{{ $t("Expected Value") }}</label>
<input id="expectedValue" v-model="monitor.expectedValue" type="text" class="form-control" required>
</div>
</template>

Expand Down Expand Up @@ -914,6 +936,7 @@ const monitorDefaults = {
mqttPassword: "",
mqttTopic: "",
mqttSuccessMessage: "",
mqttCheckType: "keyword",
authMethod: null,
oauth_auth_method: "client_secret_basic",
httpBodyEncoding: "json",
Expand Down

0 comments on commit 4643261

Please sign in to comment.