diff --git a/core/pipeline-driver/bootstrap.js b/core/pipeline-driver/bootstrap.js index f633611df..59feae353 100644 --- a/core/pipeline-driver/bootstrap.js +++ b/core/pipeline-driver/bootstrap.js @@ -10,7 +10,10 @@ const modules = [ './lib/producer/jobs-producer', './lib/consumer/jobs-consumer', './lib/state/state-factory', - './lib/datastore/storage-factory' + './lib/datastore/storage-factory', + './lib/datastore/graph-store', + './lib/datastore/redis-storage-adapter', + ]; class Bootstrap { diff --git a/core/pipeline-driver/common/consts/componentNames.js b/core/pipeline-driver/common/consts/componentNames.js index b8932eb4d..f14f3fe5e 100644 --- a/core/pipeline-driver/common/consts/componentNames.js +++ b/core/pipeline-driver/common/consts/componentNames.js @@ -3,7 +3,10 @@ const Components = { MAIN: 'Main', JOBS_CONSUMER: 'Job-Consumer', JOBS_PRODUCER: 'Job-Producer', - TASK_RUNNER: 'Task-Runner' + TASK_RUNNER: 'Task-Runner', + REDIS_PERSISTENT: 'Redis-Persistent', + GRAPH_STORE: 'Graph-Store' + }; module.exports = Components; diff --git a/core/pipeline-driver/lib/consts/graph-storage-types.js b/core/pipeline-driver/lib/consts/graph-storage-types.js new file mode 100644 index 000000000..75ac88ad6 --- /dev/null +++ b/core/pipeline-driver/lib/consts/graph-storage-types.js @@ -0,0 +1,26 @@ +const groupTypes = { + STATUS: { + NOT_STARTED: 'notStarted', + RUNNING: 'running', + COMPLETED: 'completed', + }, + BATCH: { + NOT_STARTED: 'batchNotStarted', + RUNNING: 'batchRunning', + COMPLETED: 'batchCompleted', + }, + SINGLE: { + NOT_STARTED: 'notStarted', + RUNNING: 'running', + COMPLETED: 'completed', + }, + EDGE: { + WAIT_ANY: 'waitAny', + NONE: 'none' + } +}; + + +module.exports = { + groupTypes +}; diff --git a/core/pipeline-driver/lib/datastore/graph-store.js b/core/pipeline-driver/lib/datastore/graph-store.js new file mode 100644 index 000000000..a9cdaa724 --- /dev/null +++ b/core/pipeline-driver/lib/datastore/graph-store.js @@ -0,0 +1,144 @@ + +const States = require('../state/NodeStates'); +const RedisStorage = require('./redis-storage-adapter'); +const { groupTypes } = require('../consts/graph-storage-types'); +const log = require('@hkube/logger').GetLogFromContainer(); +const components = require('../../common/consts/componentNames'); +class GraphStore { + constructor() { + this.nodesMap = null; + this.INTERVAL = 4000; + this.currentJobID = null; + this.started = false; + this._filterData = this.filterData.bind(this); + } + async init(options) { + this.options = options; + } + + start(jobId, nodeMap) { + this.currentJobID = jobId; + this.nodesMap = nodeMap; + RedisStorage.setJobId(jobId); + this.started = true; + this.store(); + } + stop() { + this.started = false; + this.currentJobID = null; + this.nodesMap = null; + } + store() { + setTimeout(async () => { + try { + if (this.started) { + await this._store(); + } + } + catch (error) { + log.error(error, { component: components.GRAPH_STORE }); + } + finally { + if (this.started) { + this.store(); + } + } + }, this.INTERVAL); + } + _store() { + return new Promise(async (resolve, reject) => { + try { + if (!this.nodesMap) { + return reject(new Error('nodeMap not referenced')); + } + this.caluclatedGraph = this.nodesMap.getJSONGraph(this._filterData); + await RedisStorage.put(this.caluclatedGraph); + return resolve(); + } + catch (error) { + return reject(new Error(`faild on storing graph to redis error:${error}`)); + } + }); + } + filterData(graph) { + const { EDGE } = groupTypes; + const adaptedGraph = { + jobId: this.currentJobID, + graph: { + edges: [], + nodes: [], + } + }; + adaptedGraph.graph.edges = graph.edges.map(e => ({ from: e.v, to: e.w, group: e.value[0].type === EDGE.WAIT_ANY ? EDGE.WAIT_ANY : EDGE.NONE })); + adaptedGraph.graph.nodes = graph.nodes.map(n => this._handleNode(n.value)); + return adaptedGraph; + } + _handleNode(node) { + if (node.batch.length === 0) { + return this._handleSingle(node); + } + return this._handleBatch(node); + } + _handleSingle(node) { + const { SINGLE } = groupTypes; + const calculatedNode = { id: node.nodeName, label: node.nodeName, extra: {}, group: SINGLE.NOT_STARTED }; + calculatedNode.group = this._singleStatus(node.status); + return calculatedNode; + } + + _handleBatch(node) { + const { BATCH } = groupTypes; + const calculatedNode = { id: node.nodeName, label: node.nodeName, extra: {}, group: BATCH.NOT_STARTED }; + const batchStatus = this._batchStatusCounter(node); + if (batchStatus.completed === node.batch.length) { + calculatedNode.extra.batch = `${node.batch.length}/${node.batch.length}`; + calculatedNode.group = BATCH.COMPLETED; + } + else if (batchStatus.idle === node.batch.length) { + calculatedNode.extra.batch = `0/${node.batch.length}`; + calculatedNode.group = BATCH.NOT_STARTED; + } + else { + calculatedNode.extra.batch = `${batchStatus.running}/${node.batch.length}`; + calculatedNode.group = BATCH.RUNNING; + } + return calculatedNode; + } + + + _batchStatusCounter(node) { + const batchState = { + idle: 0, + completed: 0, + running: 0, + }; + + node.batch.forEach((b) => { + const { STATUS } = groupTypes; + const status = this._singleStatus(b.status); + if (status === STATUS.COMPLETED) { + batchState.completed += 1; + } + else if (status === STATUS.NOT_STARTED) { + batchState.idle += 1; + } + else { + batchState.running += 1; + } + }); + return batchState; + } + _singleStatus(s) { + const { STATUS } = groupTypes; + if (s === States.SUCCEED || s === States.FAILED) { + return STATUS.COMPLETED; + } + else if (s === States.CREATING || s === States.PENDING) { + return STATUS.NOT_STARTED; + } + return STATUS.RUNNING; + } +} + + +module.exports = new GraphStore(); diff --git a/core/pipeline-driver/lib/datastore/redis-storage-adapter.js b/core/pipeline-driver/lib/datastore/redis-storage-adapter.js new file mode 100644 index 000000000..9fc964a3f --- /dev/null +++ b/core/pipeline-driver/lib/datastore/redis-storage-adapter.js @@ -0,0 +1,74 @@ +const { Factory } = require('@hkube/redis-utils'); +const pathLib = require('path'); +const log = require('@hkube/logger').GetLogFromContainer(); +const components = require('../../common/consts/componentNames'); + + +class RedisAdapter { + constructor() { + this.PREFIX_PATH = 'pipeline-driver/graph'; + this._isInit = false; + this.client = null; + this.currentJobId = ''; + this.path = null; + } + + async init(options) { + if (!this._isInit) { + this.client = Factory.getClient(options.redis); + this._isInit = true; + log.info('redis initiated', { component: components.REDIS_PERSISTENT }); + } + } + + setJobId(jobid) { + this.currentJobId = jobid; + this.path = pathLib.join('/', this.PREFIX_PATH, this.currentJobId); + } + async put(options) { + return this._set(options); + } + + _set(data) { + return new Promise((resolve, reject) => { // eslint-disable-line + if (!this.path) { + return reject(new Error('path not set')); + } + this.client.set(this.path, JSON.stringify(data), (err) => { + if (err) { + return reject(err); + } + return resolve(true); + }); + }); + } + + async get() { + return this._get(); + } + + _get() { + return new Promise((resolve, reject) => { + this.client.get(this.path, (err, res) => { + if (err) { + return reject(err); + } + return resolve(this._tryParseJSON(res)); + }); + }); + } + + + _tryParseJSON(json) { + let parsed = json; + try { + parsed = JSON.parse(json); + } + catch (e) { + log.warn(`fail to parse json ${json} `, { component: components.REDIS_PERSISTENT }); + } + return parsed; + } +} + +module.exports = new RedisAdapter(); diff --git a/core/pipeline-driver/lib/nodes/nodes-map.js b/core/pipeline-driver/lib/nodes/nodes-map.js index e0044d892..50305d796 100644 --- a/core/pipeline-driver/lib/nodes/nodes-map.js +++ b/core/pipeline-driver/lib/nodes/nodes-map.js @@ -1,4 +1,5 @@ const EventEmitter = require('events'); +const graphlib = require('graphlib'); const { Graph } = require('graphlib'); const deepExtend = require('deep-extend'); const GroupBy = require('../helpers/group-by'); @@ -280,6 +281,10 @@ class NodesMap extends EventEmitter { return states.every(this._isCompleted); } + getJSONGraph(filterFunction) { + return filterFunction(graphlib.json.write(this._graph)); + } + getAllNodes() { const nodes = this._graph.nodes(); return nodes.map(n => this._graph.node(n)); diff --git a/core/pipeline-driver/lib/tasks/task-runner.js b/core/pipeline-driver/lib/tasks/task-runner.js index 3756a4b1a..051a3ba7c 100644 --- a/core/pipeline-driver/lib/tasks/task-runner.js +++ b/core/pipeline-driver/lib/tasks/task-runner.js @@ -12,6 +12,7 @@ const log = require('@hkube/logger').GetLogFromContainer(); const component = require('../../common/consts/componentNames').TASK_RUNNER; const { metricsNames } = require('../consts/metricsNames'); const { tracer, metrics, utils } = require('@hkube/metrics'); +const graphStore = require('../datastore/graph-store'); metrics.addTimeMeasure({ name: metricsNames.pipelines_net, @@ -109,6 +110,8 @@ class TaskRunner { this._runNode(node.nodeName, node.parentOutput, node.index); }); this._progress = new Progress({ calcProgress: this._nodes.calcProgress, sendProgress: this._stateManager.setJobStatus }); + graphStore.start(job.data.jobId, this._nodes); + await this._stateManager.watchTasks({ jobId: this._jobId }); const watchState = await this._stateManager.watchJobState({ jobId: this._jobId }); @@ -195,6 +198,7 @@ class TaskRunner { this._job = null; this._jobId = null; this._stateManager.clean(); + graphStore.stop(); this._stateManager = null; this._progress = null; }