Skip to content

Commit

Permalink
queues snapshot (#1387)
Browse files Browse the repository at this point in the history
* fix: redis persist

* feat: improve snapshot

* feat: improve snapshot

* feat: improve snapshot

* feat: improve snapshot

* Merge branch 'master' into alg-queue-snapshot

* feat: improve snapshot

* Merge branch 'alg-queue-snapshot' of github.com:kube-HPC/hkube into alg-queue-snapshot

* feat: add storage envs

* feat: PR changes

* feat: PR changes

* feat: improve persistency

* feat: queues snapshots

* feat: queues snapshot

* feat: queues snapshot

* feat: queues snapshot

* feat: queues snapshot

* Merge branch 'master' into alg-queue-snapshot

* Merge branch 'master' into alg-queue-snapshot

* Merge branch 'master' into alg-queue-snapshot

* Merge branch 'master' into alg-queue-snapshot
  • Loading branch information
nassiharel authored Sep 2, 2021
1 parent b4012dc commit 383394d
Show file tree
Hide file tree
Showing 42 changed files with 3,591 additions and 1,363 deletions.
3 changes: 2 additions & 1 deletion core/algorithm-operator/lib/deployments/algorithm-queue.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const clonedeep = require('lodash.clonedeep');
const log = require('@hkube/logger').GetLogFromContainer();
const { applyEnvToContainer, applyResourceRequests, applyImagePullSecret } = require('@hkube/kubernetes-client').utils;
const { applyEnvToContainer, applyResourceRequests, applyImagePullSecret, applyStorage } = require('@hkube/kubernetes-client').utils;
const { applyImage, applyJaeger } = require('../helpers/kubernetes-utils');
const component = require('../consts/componentNames').K8S;
const { algorithmQueueTemplate } = require('../templates/algorithm-queue');
Expand Down Expand Up @@ -57,6 +57,7 @@ const createDeploymentSpec = ({ queueId, versions, registry, clusterOptions, res
spec = applyImage(spec, CONTAINERS.ALGORITHM_QUEUE, versions, registry);
spec = applyJaeger(spec, CONTAINERS.ALGORITHM_QUEUE, options);
spec = applyNodeSelector(spec, clusterOptions);
spec = applyStorage(spec, options.defaultStorage, CONTAINERS.ALGORITHM_QUEUE, 'algorithm-operator-configmap');
if (settings.applyResourceLimits) {
spec = applyResources(spec, resources);
}
Expand Down
29 changes: 28 additions & 1 deletion core/algorithm-operator/lib/templates/algorithm-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,34 @@ const algorithmQueueTemplate = {
key: 'mongodb-database'
}
}
}
},
{
name: 'CLUSTER_NAME',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'CLUSTER_NAME'
}
}
},
{
name: 'DEFAULT_STORAGE',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'DEFAULT_STORAGE'
}
}
},
{
name: 'STORAGE_ENCODING',
valueFrom: {
configMapKeyRef: {
name: 'algorithm-operator-configmap',
key: 'STORAGE_ENCODING'
}
}
},
]
}
]
Expand Down
60 changes: 35 additions & 25 deletions core/algorithm-queue/bootstrap.js
Original file line number Diff line number Diff line change
@@ -1,39 +1,41 @@

const Logger = require('@hkube/logger');
const configIt = require('@hkube/config');
const { main, logger } = configIt.load();
const log = new Logger(main.serviceName, logger);
const monitor = require('@hkube/redis-utils').Monitor;
const { componentName } = require('./lib/consts/index');
const { tracer } = require('@hkube/metrics');
const storageManager = require('@hkube/storage-manager');
const monitor = require('@hkube/redis-utils').Monitor;
const { main: config, logger } = configIt.load();
const log = new Logger(config.serviceName, logger);
const component = require('./lib/consts/component-name').MAIN;
const gracefulShutdown = require('./lib/graceful-shutdown');

const modules = [
require('./lib/persistency/db'),
require('./lib/persistency/etcd'),
require('./lib/queues-manager'),
require('./lib/persistency/redis-storage-adapter'),
require('./lib/metrics/aggregation-metrics-factory')
];

class Bootstrap {
async init() {
async init(bootstrap) {
try {
this._handleErrors();
log.info('running application in ' + configIt.env() + ' environment', { component: componentName.MAIN });
log.info(`running application with env: ${configIt.env()}, version: ${config.version}, node: ${process.versions.node}`, { component });
monitor.on('ready', (data) => {
log.info((data.message).green, { component: componentName.MAIN });
log.info((data.message).green, { component });
});
monitor.on('close', (data) => {
log.error(data.error.message, { component: componentName.MAIN });
log.error(data.error.message, { component });
});
await monitor.check(main.redis);
if (main.tracer) {
await tracer.init(main.tracer);
await monitor.check(config.redis);
if (config.tracer) {
await tracer.init(config.tracer);
}
await storageManager.init(config, log, bootstrap);
for (const m of modules) {
await m.init(main);
await m.init(config);
}
return main;
return config;
}
catch (error) {
this._onInitFailed(error);
Expand All @@ -42,31 +44,39 @@ class Bootstrap {
}

_onInitFailed(error) {
log.error(error.message, { component: componentName.MAIN }, error);
log.error(error.message, { component }, error);
process.exit(1);
}

_handleErrors() {
process.on('exit', (code) => {
log.info('exit' + (code ? ' code ' + code : ''), { component: componentName.MAIN });
log.info(`exit code ${code}`, { component });
});
process.on('SIGINT', () => {
log.info('SIGINT', { component: componentName.MAIN });
process.exit(1);
log.info('SIGINT', { component });
gracefulShutdown.shutdown(() => {
process.exit(0);
});
});
process.on('SIGTERM', () => {
log.info('SIGTERM', { component: componentName.MAIN });
process.exit(1);
log.info('SIGTERM', { component });
gracefulShutdown.shutdown(() => {
process.exit(0);
});
});
process.on('unhandledRejection', (error) => {
log.error('unhandledRejection: ' + error, { component: componentName.MAIN }, error);
process.exit(1);
log.error(`unhandledRejection: ${error.message}`, { component }, error);
gracefulShutdown.shutdown(() => {
process.exit(1);
});
});
process.on('uncaughtException', (error) => {
log.error('uncaughtException: ' + error.message, { component: componentName.MAIN }, error);
process.exit(1);
log.error(`uncaughtException: ${error.message}`, { component }, error);
gracefulShutdown.shutdown(() => {
process.exit(1);
});
});
}
}

module.exports = new Bootstrap();
module.exports = new Bootstrap();
36 changes: 33 additions & 3 deletions core/algorithm-queue/config/main/config.base.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
const packageJson = require(process.cwd() + '/package.json'); // eslint-disable-line
const formatter = require(process.cwd() + '/lib/utils/formatters'); // eslint-disable-line
const config = {};
const heuristicsNames = require('../../lib/consts/heuristics-name');

const config = {};
config.version = packageJson.version;
config.serviceName = packageJson.name;
const useSentinel = !!process.env.REDIS_SENTINEL_SERVICE_HOST;
config.queueId = process.env.QUEUE_ID;
const storageEncoding = process.env.STORAGE_ENCODING || 'bson';
config.defaultStorage = process.env.DEFAULT_STORAGE || 's3';
config.clusterName = process.env.CLUSTER_NAME || 'local';

config.redis = {
host: useSentinel ? process.env.REDIS_SENTINEL_SERVICE_HOST : process.env.REDIS_SERVICE_HOST || 'localhost',
Expand Down Expand Up @@ -55,7 +58,7 @@ config.consumer = {

config.queue = {
updateInterval: process.env.INTERVAL || 1000,
maxPersistencySize: process.env.MAX_PERSISTENCY_SIZE || '1e6'
maxPersistencySize: process.env.MAX_PERSISTENCY_SIZE || '10e6'
};

config.heuristicsWeights = {
Expand Down Expand Up @@ -88,4 +91,31 @@ config.logging = {
tasks: formatter.parseBool(process.env.LOG_TASKS, true)
};

config.scoring = {
maxSize: formatter.parseInt(process.env.MAX_SCORING_SIZE, 5000)
};

config.s3 = {
accessKeyId: process.env.AWS_ACCESS_KEY_ID || 'AKIAIOSFODNN7EXAMPLE',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
endpoint: process.env.S3_ENDPOINT_URL || 'http://127.0.0.1:9000'
};

config.fs = {
baseDirectory: process.env.BASE_FS_ADAPTER_DIRECTORY || '/var/tmp/fs/storage'
};

config.storageAdapters = {
s3: {
connection: config.s3,
encoding: storageEncoding,
moduleName: process.env.STORAGE_MODULE || '@hkube/s3-adapter'
},
fs: {
connection: config.fs,
encoding: storageEncoding,
moduleName: process.env.STORAGE_MODULE || '@hkube/fs-adapter'
}
};

module.exports = config;
3 changes: 2 additions & 1 deletion core/algorithm-queue/lib/consts/component-name.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ module.exports = {
BOOTSTRAP: 'BOOTSTRAP',
HEURISTIC_RUNNER: 'HEURISTIC_RUNNER',
MAIN: 'MAIN',
PERSISTENCY: 'PERSISTENCY',
REDIS_PERSISTENT: 'REDIS_PERSISTENT',
ETCD_PERSISTENT: 'ETCD_PERSISTENT',
JOBS_PRODUCER: 'JOBS_PRODUCER',
JOBS_CONSUMER: 'JOBS_CONSUMER',
GRACEFUL_SHUTDOWN: 'GRACEFUL_SHUTDOWN',
AGGREGATION_METRIC: 'AGGREGATION_METRIC',
DB: 'DB'
};
21 changes: 21 additions & 0 deletions core/algorithm-queue/lib/graceful-shutdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
const log = require('@hkube/logger').GetLogFromContainer();
const queuesManager = require('./queues-manager');
const component = require('./consts/component-name').GRACEFUL_SHUTDOWN;

class GracefulShutdown {
async shutdown(cb) {
try {
log.info('starting graceful shutdown', { component });
await queuesManager.shutdown();
log.info('finish graceful shutdown', { component });
}
catch (e) {
log.error(`error in graceful shutdown ${e.message}`, { component }, e);
}
finally {
cb();
}
}
}

module.exports = new GracefulShutdown();
8 changes: 4 additions & 4 deletions core/algorithm-queue/lib/heuristic-runner.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const Logger = require('@hkube/logger');
const { taskStatuses } = require('@hkube/consts');
const heuristics = require('./heuristic/index');
const components = require('./consts/component-name');
const component = require('./consts/component-name').HEURISTIC_RUNNER;
const log = Logger.GetLogFromContainer();

class heuristicRunner {
Expand All @@ -15,18 +15,18 @@ class heuristicRunner {
this.heuristicMap.push({ name: heuristic.name, heuristic: heuristic.algorithm(heuristicsWeights[heuristic.name]), weight: heuristicsWeights[heuristic.name] });
}
else {
log.info('couldnt find weight for heuristic ', { component: components.HEURISTIC_RUNNER });
log.info('couldnt find weight for heuristic ', { component });
}
}

run(job) {
let score = 0;
if (job.status !== taskStatuses.PRESCHEDULE) {
log.debug('start running heuristic for ', { component: components.HEURISTIC_RUNNER });
log.debug('start running heuristic for ', { component });
score = this.heuristicMap.reduce((result, algorithm) => {
const heuristicScore = algorithm.heuristic(job);
job.calculated.latestScores[algorithm.name] = heuristicScore; // eslint-disable-line
log.debug(`during score calculation for ${algorithm.name} in ${job.jobId} score:${heuristicScore} calculated:${result + heuristicScore}`, { component: components.HEURISTIC_RUNNER });
log.debug(`during score calculation for ${algorithm.name} in ${job.jobId} score:${heuristicScore} calculated:${result + heuristicScore}`, { component });
return result + heuristicScore;
}, 0);
}
Expand Down
5 changes: 2 additions & 3 deletions core/algorithm-queue/lib/jobs/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ class JobConsumer extends EventEmitter {
}
}

// TODO: remove this calculated stuff....
pipelineToQueueAdapter(jobData, taskData, initialBatchLength) {
_adaptData(jobData, taskData, initialBatchLength) {
const latestScores = Object.values(heuristicsName).reduce((acc, cur) => {
acc[cur] = 0.00001;
return acc;
Expand Down Expand Up @@ -149,7 +148,7 @@ class JobConsumer extends EventEmitter {

queueTasksBuilder(job) {
const { tasks, ...jobData } = job.data;
const taskList = tasks.map(task => this.pipelineToQueueAdapter(jobData, task, tasks.length));
const taskList = tasks.map(task => this._adaptData(jobData, task, tasks.length));
this.emit('jobs-add', taskList);
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/algorithm-queue/lib/jobs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class JobProducer {
}
let err;
let status;
const maxAttempts = (retry && retry.limit) || MAX_JOB_ATTEMPTS;
const maxAttempts = retry?.limit ?? MAX_JOB_ATTEMPTS;
const task = this._pipelineToQueueAdapter(job.options);
let { attempts } = task;

Expand Down
Loading

0 comments on commit 383394d

Please sign in to comment.