Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

queues snapshot #1387

Merged
merged 21 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/algorithm-operator/lib/deployments/algorithm-queue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const clonedeep = require('lodash.clonedeep');
const log = require('@hkube/logger').GetLogFromContainer();
const { applyEnvToContainer, applyResourceRequests, applyImagePullSecret } = require('@hkube/kubernetes-client').utils;
const { applyEnvToContainer, applyResourceRequests, applyImagePullSecret, applyStorage } = require('@hkube/kubernetes-client').utils;
const { applyImage, applyJaeger } = require('../helpers/kubernetes-utils');
const component = require('../consts/componentNames').K8S;
const { algorithmQueueTemplate } = require('../templates/algorithm-queue');
Expand Down Expand Up @@ -57,6 +57,7 @@ const createDeploymentSpec = ({ queueId, versions, registry, clusterOptions, res
spec = applyImage(spec, CONTAINERS.ALGORITHM_QUEUE, versions, registry);
spec = applyJaeger(spec, CONTAINERS.ALGORITHM_QUEUE, options);
spec = applyNodeSelector(spec, clusterOptions);
spec = applyStorage(spec, options.defaultStorage, CONTAINERS.ALGORITHM_QUEUE, 'algorithm-operator-configmap');
if (settings.applyResourceLimits) {
spec = applyResources(spec, resources);
}
Expand Down
29 changes: 28 additions & 1 deletion core/algorithm-operator/lib/templates/algorithm-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,34 @@ const algorithmQueueTemplate = {
key: 'mongodb-database'
}
}
}
},
{
name: 'CLUSTER_NAME',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'CLUSTER_NAME'
}
}
},
{
name: 'DEFAULT_STORAGE',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'DEFAULT_STORAGE'
}
}
},
{
name: 'STORAGE_ENCODING',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'STORAGE_ENCODING'
}
}
},
]
}
]
Expand Down
60 changes: 35 additions & 25 deletions core/algorithm-queue/bootstrap.js
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@

const Logger = require('@hkube/logger');
const configIt = require('@hkube/config');
const { main, logger } = configIt.load();
const log = new Logger(main.serviceName, logger);
const monitor = require('@hkube/redis-utils').Monitor;
const { componentName } = require('./lib/consts/index');
const { tracer } = require('@hkube/metrics');
const storageManager = require('@hkube/storage-manager');
const monitor = require('@hkube/redis-utils').Monitor;
const { main: config, logger } = configIt.load();
const log = new Logger(config.serviceName, logger);
const component = require('./lib/consts/component-name').MAIN;
const gracefulShutdown = require('./lib/graceful-shutdown');

const modules = [
require('./lib/persistency/db'),
require('./lib/persistency/etcd'),
require('./lib/queues-manager'),
require('./lib/persistency/redis-storage-adapter'),
require('./lib/metrics/aggregation-metrics-factory')
];

class Bootstrap {
async init() {
async init(bootstrap) {
try {
this._handleErrors();
log.info('running application in ' + configIt.env() + ' environment', { component: componentName.MAIN });
log.info(`running application with env: ${configIt.env()}, version: ${config.version}, node: ${process.versions.node}`, { component });
monitor.on('ready', (data) => {
log.info((data.message).green, { component: componentName.MAIN });
log.info((data.message).green, { component });
});
monitor.on('close', (data) => {
log.error(data.error.message, { component: componentName.MAIN });
log.error(data.error.message, { component });
});
await monitor.check(main.redis);
if (main.tracer) {
await tracer.init(main.tracer);
await monitor.check(config.redis);
if (config.tracer) {
await tracer.init(config.tracer);
}
await storageManager.init(config, log, bootstrap);
for (const m of modules) {
await m.init(main);
await m.init(config);
}
return main;
return config;
}
catch (error) {
this._onInitFailed(error);
Expand All @@ -42,31 +44,39 @@ class Bootstrap {
}

_onInitFailed(error) {
log.error(error.message, { component: componentName.MAIN }, error);
log.error(error.message, { component }, error);
process.exit(1);
}

_handleErrors() {
process.on('exit', (code) => {
log.info('exit' + (code ? ' code ' + code : ''), { component: componentName.MAIN });
log.info(`exit code ${code}`, { component });
});
process.on('SIGINT', () => {
log.info('SIGINT', { component: componentName.MAIN });
process.exit(1);
log.info('SIGINT', { component });
gracefulShutdown.shutdown(() => {
process.exit(0);
});
});
process.on('SIGTERM', () => {
log.info('SIGTERM', { component: componentName.MAIN });
process.exit(1);
log.info('SIGTERM', { component });
gracefulShutdown.shutdown(() => {
process.exit(0);
});
});
process.on('unhandledRejection', (error) => {
log.error('unhandledRejection: ' + error, { component: componentName.MAIN }, error);
process.exit(1);
log.error(`unhandledRejection: ${error.message}`, { component }, error);
gracefulShutdown.shutdown(() => {
process.exit(1);
});
});
process.on('uncaughtException', (error) => {
log.error('uncaughtException: ' + error.message, { component: componentName.MAIN }, error);
process.exit(1);
log.error(`uncaughtException: ${error.message}`, { component }, error);
gracefulShutdown.shutdown(() => {
process.exit(1);
});
});
}
}

module.exports = new Bootstrap();
module.exports = new Bootstrap();
36 changes: 33 additions & 3 deletions core/algorithm-queue/config/main/config.base.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const packageJson = require(process.cwd() + '/package.json'); // eslint-disable-line
const formatter = require(process.cwd() + '/lib/utils/formatters'); // eslint-disable-line
const config = {};
const heuristicsNames = require('../../lib/consts/heuristics-name');

const config = {};
config.version = packageJson.version;
config.serviceName = packageJson.name;
const useSentinel = !!process.env.REDIS_SENTINEL_SERVICE_HOST;
config.queueId = process.env.QUEUE_ID;
const storageEncoding = process.env.STORAGE_ENCODING || 'bson';
config.defaultStorage = process.env.DEFAULT_STORAGE || 's3';
config.clusterName = process.env.CLUSTER_NAME || 'local';

config.redis = {
host: useSentinel ? process.env.REDIS_SENTINEL_SERVICE_HOST : process.env.REDIS_SERVICE_HOST || 'localhost',
Expand Down Expand Up @@ -55,7 +58,7 @@ config.consumer = {

config.queue = {
updateInterval: process.env.INTERVAL || 1000,
maxPersistencySize: process.env.MAX_PERSISTENCY_SIZE || '1e6'
maxPersistencySize: process.env.MAX_PERSISTENCY_SIZE || '10e6'
};

config.heuristicsWeights = {
Expand Down Expand Up @@ -88,4 +91,31 @@ config.logging = {
tasks: formatter.parseBool(process.env.LOG_TASKS, true)
};

config.scoring = {
maxSize: formatter.parseInt(process.env.MAX_SCORING_SIZE, 5000)
};

config.s3 = {
accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'AKIAIOSFODNN7EXAMPLE',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
endpoint: process.env.S3_ENDPOINT_URL || 'http://127.0.0.1:9000'
};

config.fs = {
baseDirectory: process.env.BASE_FS_ADAPTER_DIRECTORY || '/var/tmp/fs/storage'
};

config.storageAdapters = {
s3: {
connection: config.s3,
encoding: storageEncoding,
moduleName: process.env.STORAGE_MODULE || '@hkube/s3-adapter'
},
fs: {
connection: config.fs,
encoding: storageEncoding,
moduleName: process.env.STORAGE_MODULE || '@hkube/fs-adapter'
}
};

module.exports = config;
3 changes: 2 additions & 1 deletion core/algorithm-queue/lib/consts/component-name.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ module.exports = {
BOOTSTRAP: 'BOOTSTRAP',
HEURISTIC_RUNNER: 'HEURISTIC_RUNNER',
MAIN: 'MAIN',
PERSISTENCY: 'PERSISTENCY',
REDIS_PERSISTENT: 'REDIS_PERSISTENT',
ETCD_PERSISTENT: 'ETCD_PERSISTENT',
JOBS_PRODUCER: 'JOBS_PRODUCER',
JOBS_CONSUMER: 'JOBS_CONSUMER',
GRACEFUL_SHUTDOWN: 'GRACEFUL_SHUTDOWN',
AGGREGATION_METRIC: 'AGGREGATION_METRIC',
DB: 'DB'
};
21 changes: 21 additions & 0 deletions core/algorithm-queue/lib/graceful-shutdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const log = require('@hkube/logger').GetLogFromContainer();
const queuesManager = require('./queues-manager');
const component = require('./consts/component-name').GRACEFUL_SHUTDOWN;

class GracefulShutdown {
async shutdown(cb) {
try {
log.info('starting graceful shutdown', { component });
await queuesManager.shutdown();
log.info('finish graceful shutdown', { component });
}
catch (e) {
log.error(`error in graceful shutdown ${e.message}`, { component }, e);
}
finally {
cb();
}
}
}

module.exports = new GracefulShutdown();
8 changes: 4 additions & 4 deletions core/algorithm-queue/lib/heuristic-runner.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const Logger = require('@hkube/logger');
const { taskStatuses } = require('@hkube/consts');
const heuristics = require('./heuristic/index');
const components = require('./consts/component-name');
const component = require('./consts/component-name').HEURISTIC_RUNNER;
const log = Logger.GetLogFromContainer();

class heuristicRunner {
Expand All @@ -15,18 +15,18 @@ class heuristicRunner {
this.heuristicMap.push({ name: heuristic.name, heuristic: heuristic.algorithm(heuristicsWeights[heuristic.name]), weight: heuristicsWeights[heuristic.name] });
}
else {
log.info('couldnt find weight for heuristic ', { component: components.HEURISTIC_RUNNER });
log.info('couldnt find weight for heuristic ', { component });
}
}

run(job) {
let score = 0;
if (job.status !== taskStatuses.PRESCHEDULE) {
log.debug('start running heuristic for ', { component: components.HEURISTIC_RUNNER });
log.debug('start running heuristic for ', { component });
score = this.heuristicMap.reduce((result, algorithm) => {
const heuristicScore = algorithm.heuristic(job);
job.calculated.latestScores[algorithm.name] = heuristicScore; // eslint-disable-line
log.debug(`during score calculation for ${algorithm.name} in ${job.jobId} score:${heuristicScore} calculated:${result + heuristicScore}`, { component: components.HEURISTIC_RUNNER });
log.debug(`during score calculation for ${algorithm.name} in ${job.jobId} score:${heuristicScore} calculated:${result + heuristicScore}`, { component });
return result + heuristicScore;
}, 0);
}
Expand Down
5 changes: 2 additions & 3 deletions core/algorithm-queue/lib/jobs/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ class JobConsumer extends EventEmitter {
}
}

// TODO: remove this calculated stuff....
pipelineToQueueAdapter(jobData, taskData, initialBatchLength) {
_adaptData(jobData, taskData, initialBatchLength) {
const latestScores = Object.values(heuristicsName).reduce((acc, cur) => {
acc[cur] = 0.00001;
return acc;
Expand Down Expand Up @@ -149,7 +148,7 @@ class JobConsumer extends EventEmitter {

queueTasksBuilder(job) {
const { tasks, ...jobData } = job.data;
const taskList = tasks.map(task => this.pipelineToQueueAdapter(jobData, task, tasks.length));
const taskList = tasks.map(task => this._adaptData(jobData, task, tasks.length));
this.emit('jobs-add', taskList);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/algorithm-queue/lib/jobs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class JobProducer {
}
let err;
let status;
const maxAttempts = (retry && retry.limit) || MAX_JOB_ATTEMPTS;
const maxAttempts = retry?.limit ?? MAX_JOB_ATTEMPTS;
const task = this._pipelineToQueueAdapter(job.options);
let { attempts } = task;

Expand Down
Loading