Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/be/job queue tests #1035

Merged
merged 8 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import { fileURLToPath } from "url";

import { connectDbAndRunServer } from "./configs/db.js";
import queueRouter from "./routes/queueRoute.js";

//JobQueue service and dependencies
import JobQueue from "./service/jobQueue.js";
import { Queue, Worker } from "bullmq";

//Network service and dependencies
import NetworkService from "./service/networkService.js";
Expand Down Expand Up @@ -157,7 +160,14 @@ const startApp = async () => {
logger
);
const networkService = new NetworkService(db, emailService, axios, ping, logger, http);
const jobQueue = await JobQueue.createJobQueue(db, networkService, settingsService);
const jobQueue = await JobQueue.createJobQueue(
db,
networkService,
settingsService,
logger,
Queue,
Worker
);

const cleanup = async () => {
if (cleaningUp) {
Expand Down
67 changes: 36 additions & 31 deletions Server/service/jobQueue.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import { Queue, Worker, Job } from "bullmq";
const QUEUE_NAME = "monitors";

const JOBS_PER_WORKER = 5;
import logger from "../utils/logger.js";
import { errorMessages, successMessages } from "../utils/messages.js";
const SERVICE_NAME = "JobQueue";
/**
Expand All @@ -19,11 +16,13 @@ class JobQueue {
* @param {SettingsService} settingsService - The settings service
* @throws {Error}
*/
constructor(settingsService) {
const { redisHost, redisPort } = settingsService.getSettings();
constructor(settingsService, logger, Queue, Worker) {
const settings = settingsService.getSettings() || {};

const { redisHost = "127.0.0.1", redisPort = 6379 } = settings;
const connection = {
host: redisHost || "127.0.0.1",
port: redisPort || 6379,
host: redisHost,
port: redisPort,
};
this.connection = connection;
this.queue = new Queue(QUEUE_NAME, {
Expand All @@ -33,6 +32,8 @@ class JobQueue {
this.db = null;
this.networkService = null;
this.settingsService = settingsService;
this.logger = logger;
this.Worker = Worker;
}

/**
Expand All @@ -42,8 +43,15 @@ class JobQueue {
* @returns {Promise<JobQueue>} - Returns a new JobQueue
*
*/
static async createJobQueue(db, networkService, settingsService) {
const queue = new JobQueue(settingsService);
static async createJobQueue(
db,
networkService,
settingsService,
logger,
Queue,
Worker
) {
const queue = new JobQueue(settingsService, logger, Queue, Worker);
try {
queue.db = db;
queue.networkService = networkService;
Expand All @@ -69,7 +77,7 @@ class JobQueue {
* @returns {Worker} The newly created worker
*/
createWorker() {
const worker = new Worker(
const worker = new this.Worker(
QUEUE_NAME,
async (job) => {
try {
Expand All @@ -96,17 +104,16 @@ class JobQueue {
}
return acc;
}, false);

if (!maintenanceWindowActive) {
await this.networkService.getStatus(job);
} else {
logger.info(`Monitor ${monitorId} is in maintenance window`, {
this.logger.info(`Monitor ${monitorId} is in maintenance window`, {
service: SERVICE_NAME,
monitorId,
});
}
} catch (error) {
logger.error(`Error processing job ${job.id}: ${error.message}`, {
this.logger.error(`Error processing job ${job.id}: ${error.message}`, {
service: SERVICE_NAME,
jobId: job.id,
error: error,
Expand Down Expand Up @@ -169,11 +176,9 @@ class JobQueue {
}
return true;
}

if (workerStats.load > JOBS_PER_WORKER) {
// Find out how many more jobs we have than current workers can handle
const excessJobs = workerStats.jobs.length - this.workers.length * JOBS_PER_WORKER;

// Divide by jobs/worker to find out how many workers to add
const workersToAdd = Math.ceil(excessJobs / JOBS_PER_WORKER);
for (let i = 0; i < workersToAdd; i++) {
Expand All @@ -188,18 +193,17 @@ class JobQueue {
const workerCapacity = this.workers.length * JOBS_PER_WORKER;
const excessCapacity = workerCapacity - workerStats.jobs.length;
// Calculate how many workers to remove
const workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER);
if (this.workers.length > 5) {
for (let i = 0; i < workersToRemove; i++) {
const worker = this.workers.pop();
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
}
let workersToRemove = Math.floor(excessCapacity / JOBS_PER_WORKER); // Make sure there are always at least 5
while (workersToRemove > 0 && this.workers.length > 5) {
const worker = this.workers.pop();
workersToRemove--;
try {
await worker.close();
} catch (error) {
// Catch the error instead of throwing it
this.logger.error(errorMessages.JOB_QUEUE_WORKER_CLOSE, {
service: SERVICE_NAME,
});
}
}
return true;
Expand Down Expand Up @@ -282,14 +286,14 @@ class JobQueue {
every: monitor.interval,
});
if (deleted) {
logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
this.logger.info(successMessages.JOB_QUEUE_DELETE_JOB, {
service: SERVICE_NAME,
jobId: monitor.id,
});
const workerStats = await this.getWorkerStats();
await this.scaleWorkers(workerStats);
} else {
logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
this.logger.error(errorMessages.JOB_QUEUE_DELETE_JOB, {
service: SERVICE_NAME,
jobId: monitor.id,
});
Expand All @@ -311,9 +315,10 @@ class JobQueue {
delayed: await this.queue.getDelayedCount(),
repeatableJobs: (await this.queue.getRepeatableJobs()).length,
};
console.log(metrics);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Sweat the Details: Replace console.log with this.logger

Using console.log might slip in under pressure, but switching to this.logger.info keeps your logging consistent and robust.

Apply this diff to make the change:

-    			console.log(metrics);
+    			this.logger.info(metrics, {
+    				service: SERVICE_NAME,
+    			});

Committable suggestion was skipped due to low confidence.

return metrics;
} catch (error) {
logger.error("Failed to retrieve job queue metrics", {
this.logger.error("Failed to retrieve job queue metrics", {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Don't Let Errors Slide Away

In getMetrics, an error is logged but not rethrown or handled further. This could lead to undefined return values. Consider throwing the error or returning a default metrics object to prevent unexpected behavior.

Apply this diff to rethrow the error:

    			this.logger.error("Failed to retrieve job queue metrics", {
    				service: SERVICE_NAME,
    				errorMsg: error.message,
    			});
+               throw error;

Committable suggestion was skipped due to low confidence.

service: SERVICE_NAME,
errorMsg: error.message,
});
Expand Down Expand Up @@ -344,7 +349,7 @@ class JobQueue {
await this.queue.obliterate();
metrics = await this.getMetrics();
console.log(metrics);
logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
this.logger.info(successMessages.JOB_QUEUE_OBLITERATE, {
service: SERVICE_NAME,
});
return true;
Expand Down
Loading