-
Notifications
You must be signed in to change notification settings - Fork 204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/be/job queue tests #1035
Feat/be/job queue tests #1035
Changes from all commits
56a4b21
3f90bb1
fc85d2b
2eea171
5066580
2fcdb20
1c2d758
47946cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,6 +13,7 @@ class NetworkService { | |||||||||
this.setBaseUrl(baseURL); | ||||||||||
this.unsubscribe = store.subscribe(() => { | ||||||||||
const state = store.getState(); | ||||||||||
console.log(state.settings.apiBaseUrl); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove or guard the sensitive URL logging statement. The logging of -console.log(state.settings.apiBaseUrl);
+if (process.env.NODE_ENV === 'development') {
+ console.log('API Base URL:', state.settings.apiBaseUrl);
+} 📝 Committable suggestion
Suggested change
|
||||||||||
if (BASE_URL !== undefined) { | ||||||||||
baseURL = BASE_URL; | ||||||||||
} else if (state?.settings?.apiBaseUrl ?? null) { | ||||||||||
|
@@ -87,48 +88,48 @@ class NetworkService { | |||||||||
}, | ||||||||||
}); | ||||||||||
} | ||||||||||
/** | ||||||||||
* | ||||||||||
* ************************************ | ||||||||||
* Check the endpoint resolution | ||||||||||
* ************************************ | ||||||||||
* | ||||||||||
* @async | ||||||||||
* @param {Object} config - The configuration object. | ||||||||||
* @param {string} config.authToken - The authorization token to be used in the request header. | ||||||||||
* @param {Object} config.monitorURL - The monitor url to be sent in the request body. | ||||||||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request. | ||||||||||
*/ | ||||||||||
async checkEndpointResolution(config) { | ||||||||||
const { authToken, monitorURL } = config; | ||||||||||
const params = new URLSearchParams(); | ||||||||||
if (monitorURL) params.append("monitorURL", monitorURL); | ||||||||||
|
||||||||||
return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, { | ||||||||||
headers: { | ||||||||||
Authorization: `Bearer ${authToken}`, | ||||||||||
"Content-Type": "application/json", | ||||||||||
} | ||||||||||
}) | ||||||||||
} | ||||||||||
|
||||||||||
/** | ||||||||||
* | ||||||||||
* ************************************ | ||||||||||
* Gets monitors and summary of stats by TeamID | ||||||||||
* ************************************ | ||||||||||
* | ||||||||||
* @async | ||||||||||
* @param {Object} config - The configuration object. | ||||||||||
* @param {string} config.authToken - The authorization token to be used in the request header. | ||||||||||
* @param {string} config.teamId - Team ID | ||||||||||
* @param {Array<string>} config.types - Array of monitor types | ||||||||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request. | ||||||||||
*/ | ||||||||||
async getMonitorsAndSummaryByTeamId(config) { | ||||||||||
const params = new URLSearchParams(); | ||||||||||
|
||||||||||
/** | ||||||||||
* | ||||||||||
* ************************************ | ||||||||||
* Check the endpoint resolution | ||||||||||
* ************************************ | ||||||||||
* | ||||||||||
* @async | ||||||||||
* @param {Object} config - The configuration object. | ||||||||||
* @param {string} config.authToken - The authorization token to be used in the request header. | ||||||||||
* @param {Object} config.monitorURL - The monitor url to be sent in the request body. | ||||||||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request. | ||||||||||
*/ | ||||||||||
async checkEndpointResolution(config) { | ||||||||||
const { authToken, monitorURL } = config; | ||||||||||
const params = new URLSearchParams(); | ||||||||||
|
||||||||||
if (monitorURL) params.append("monitorURL", monitorURL); | ||||||||||
|
||||||||||
return this.axiosInstance.get(`/monitors/resolution/url?${params.toString()}`, { | ||||||||||
headers: { | ||||||||||
Authorization: `Bearer ${authToken}`, | ||||||||||
"Content-Type": "application/json", | ||||||||||
}, | ||||||||||
}); | ||||||||||
} | ||||||||||
|
||||||||||
/** | ||||||||||
* | ||||||||||
* ************************************ | ||||||||||
* Gets monitors and summary of stats by TeamID | ||||||||||
* ************************************ | ||||||||||
* | ||||||||||
* @async | ||||||||||
* @param {Object} config - The configuration object. | ||||||||||
* @param {string} config.authToken - The authorization token to be used in the request header. | ||||||||||
* @param {string} config.teamId - Team ID | ||||||||||
* @param {Array<string>} config.types - Array of monitor types | ||||||||||
* @returns {Promise<AxiosResponse>} The response from the axios POST request. | ||||||||||
*/ | ||||||||||
async getMonitorsAndSummaryByTeamId(config) { | ||||||||||
const params = new URLSearchParams(); | ||||||||||
|
||||||||||
if (config.types) { | ||||||||||
config.types.forEach((type) => { | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,5 @@ | ||
import { Queue, Worker, Job } from "bullmq"; | ||
const QUEUE_NAME = "monitors"; | ||
|
||
const JOBS_PER_WORKER = 5; | ||
import logger from "../utils/logger.js"; | ||
import { errorMessages, successMessages } from "../utils/messages.js"; | ||
const SERVICE_NAME = "JobQueue"; | ||
/** | ||
|
@@ -19,11 +16,13 @@ class JobQueue { | |
* @param {SettingsService} settingsService - The settings service | ||
* @throws {Error} | ||
*/ | ||
constructor(settingsService) { | ||
const { redisHost, redisPort } = settingsService.getSettings(); | ||
constructor(settingsService, logger, Queue, Worker) { | ||
const settings = settingsService.getSettings() || {}; | ||
|
||
const { redisHost = "127.0.0.1", redisPort = 6379 } = settings; | ||
const connection = { | ||
host: redisHost || "127.0.0.1", | ||
port: redisPort || 6379, | ||
host: redisHost, | ||
port: redisPort, | ||
}; | ||
this.connection = connection; | ||
this.queue = new Queue(QUEUE_NAME, { | ||
|
@@ -33,6 +32,8 @@ class JobQueue { | |
this.db = null; | ||
this.networkService = null; | ||
this.settingsService = settingsService; | ||
this.logger = logger; | ||
this.Worker = Worker; | ||
} | ||
|
||
/** | ||
|
@@ -42,8 +43,15 @@ class JobQueue { | |
* @returns {Promise<JobQueue>} - Returns a new JobQueue | ||
* | ||
*/ | ||
static async createJobQueue(db, networkService, settingsService) { | ||
const queue = new JobQueue(settingsService); | ||
static async createJobQueue( | ||
db, | ||
networkService, | ||
settingsService, | ||
logger, | ||
Queue, | ||
Worker | ||
) { | ||
const queue = new JobQueue(settingsService, logger, Queue, Worker); | ||
try { | ||
queue.db = db; | ||
queue.networkService = networkService; | ||
|
@@ -69,7 +77,7 @@ class JobQueue { | |
* @returns {Worker} The newly created worker | ||
*/ | ||
createWorker() { | ||
const worker = new Worker( | ||
const worker = new this.Worker( | ||
QUEUE_NAME, | ||
async (job) => { | ||
try { | ||
|
@@ -96,17 +104,16 @@ class JobQueue { | |
} | ||
return acc; | ||
}, false); | ||
|
||
if (!maintenanceWindowActive) { | ||
await this.networkService.getStatus(job); | ||
} else { | ||
logger.info(`Monitor ${monitorId} is in maintenance window`, { | ||
this.logger.info(`Monitor ${monitorId} is in maintenance window`, { | ||
service: SERVICE_NAME, | ||
monitorId, | ||
}); | ||
} | ||
} catch (error) { | ||
logger.error(`Error processing job ${job.id}: ${error.message}`, { | ||
this.logger.error(`Error processing job ${job.id}: ${error.message}`, { | ||
service: SERVICE_NAME, | ||
jobId: job.id, | ||
error: error, | ||
|
@@ -169,11 +176,9 @@ class JobQueue { | |
} | ||
return true; | ||
} | ||
|
||
if (workerStats.load > JOBS_PER_WORKER) { | ||
// Find out how many more jobs we have than current workers can handle | ||
const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER; | ||
|
||
// Divide by jobs/worker to find out how many workers to add | ||
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER); | ||
for (let i = 0; i < workersToAdd; i++) { | ||
|
@@ -188,18 +193,17 @@ class JobQueue { | |
const workerCapacity = this.workers.length * JOBS_PER_WORKER; | ||
const excessCapacity = workerCapacity - workerStats.jobs.length; | ||
// Calculate how many workers to remove | ||
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); | ||
if (this.workers.length > 5) { | ||
for (let i = 0; i < workersToRemove; i++) { | ||
const worker = this.workers.pop(); | ||
try { | ||
await worker.close(); | ||
} catch (error) { | ||
// Catch the error instead of throwing it | ||
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, { | ||
service: SERVICE_NAME, | ||
}); | ||
} | ||
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5 | ||
while (workersToRemove > 0 && this.workers.length > 5) { | ||
const worker = this.workers.pop(); | ||
workersToRemove--; | ||
try { | ||
await worker.close(); | ||
} catch (error) { | ||
// Catch the error instead of throwing it | ||
this.logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, { | ||
service: SERVICE_NAME, | ||
}); | ||
} | ||
} | ||
return true; | ||
|
@@ -282,14 +286,14 @@ class JobQueue { | |
every: monitor.interval, | ||
}); | ||
if (deleted) { | ||
logger.info(successMessages.JOB_QUEUE_DELETE_JOB, { | ||
this.logger.info(successMessages.JOB_QUEUE_DELETE_JOB, { | ||
service: SERVICE_NAME, | ||
jobId: monitor.id, | ||
}); | ||
const workerStats = await this.getWorkerStats(); | ||
await this.scaleWorkers(workerStats); | ||
} else { | ||
logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, { | ||
this.logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, { | ||
service: SERVICE_NAME, | ||
jobId: monitor.id, | ||
}); | ||
|
@@ -311,9 +315,10 @@ class JobQueue { | |
delayed: await this.queue.getDelayedCount(), | ||
repeatableJobs: (await this.queue.getRepeatableJobs()).length, | ||
}; | ||
console.log(metrics); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sweat the Details: Replace Using Apply this diff to make the change: - console.log(metrics);
+ this.logger.info(metrics, {
+ service: SERVICE_NAME,
+ });
|
||
return metrics; | ||
} catch (error) { | ||
logger.error("Failed to retrieve job queue metrics", { | ||
this.logger.error("Failed to retrieve job queue metrics", { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't Let Errors Slide Away In Apply this diff to rethrow the error: this.logger.error("Failed to retrieve job queue metrics", {
service: SERVICE_NAME,
errorMsg: error.message,
});
+ throw error;
|
||
service: SERVICE_NAME, | ||
errorMsg: error.message, | ||
}); | ||
|
@@ -344,7 +349,7 @@ class JobQueue { | |
await this.queue.obliterate(); | ||
metrics = await this.getMetrics(); | ||
console.log(metrics); | ||
logger.info(successMessages.JOB_QUEUE_OBLITERATE, { | ||
this.logger.info(successMessages.JOB_QUEUE_OBLITERATE, { | ||
service: SERVICE_NAME, | ||
}); | ||
return true; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mom's spaghetti alert! 🍝 This code needs some cleanup
There's quite a bit to unpack here:
Here's a cleaner implementation: