Skip to content

Commit

Permalink
Merge pull request #12 from kube-HPC/graphlibe-persistence
Browse files Browse the repository at this point in the history
fix issue with jobid
  • Loading branch information
nassiharel committed Jul 19, 2018
1 parent 99511a5 commit e1b67fa
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 2 deletions.
5 changes: 4 additions & 1 deletion core/pipeline-driver/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ const modules = [
'./lib/producer/jobs-producer',
'./lib/consumer/jobs-consumer',
'./lib/state/state-factory',
'./lib/datastore/storage-factory'
'./lib/datastore/storage-factory',
'./lib/datastore/graph-store',
'./lib/datastore/redis-storage-adapter',

];

class Bootstrap {
Expand Down
5 changes: 4 additions & 1 deletion core/pipeline-driver/common/consts/componentNames.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ const Components = {
MAIN: 'Main',
JOBS_CONSUMER: 'Job-Consumer',
JOBS_PRODUCER: 'Job-Producer',
TASK_RUNNER: 'Task-Runner'
TASK_RUNNER: 'Task-Runner',
REDIS_PERSISTENT: 'Redis-Persistent',
GRAPH_STORE: 'Graph-Store'

};

module.exports = Components;
26 changes: 26 additions & 0 deletions core/pipeline-driver/lib/consts/graph-storage-types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const groupTypes = {
STATUS: {
NOT_STARTED: 'notStarted',
RUNNING: 'running',
COMPLETED: 'completed',
},
BATCH: {
NOT_STARTED: 'batchNotStarted',
RUNNING: 'batchRunning',
COMPLETED: 'batchCompleted',
},
SINGLE: {
NOT_STARTED: 'notStarted',
RUNNING: 'running',
COMPLETED: 'completed',
},
EDGE: {
WAIT_ANY: 'waitAny',
NONE: 'none'
}
};


module.exports = {
groupTypes
};
144 changes: 144 additions & 0 deletions core/pipeline-driver/lib/datastore/graph-store.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@

const States = require('../state/NodeStates');
const RedisStorage = require('./redis-storage-adapter');
const { groupTypes } = require('../consts/graph-storage-types');
const log = require('@hkube/logger').GetLogFromContainer();
const components = require('../../common/consts/componentNames');
class GraphStore {
constructor() {
this.nodesMap = null;
this.INTERVAL = 4000;
this.currentJobID = null;
this.started = false;
this._filterData = this.filterData.bind(this);
}
async init(options) {
this.options = options;
}

start(jobId, nodeMap) {
this.currentJobID = jobId;
this.nodesMap = nodeMap;
RedisStorage.setJobId(jobId);
this.started = true;
this.store();
}
stop() {
this.started = false;
this.currentJobID = null;
this.nodesMap = null;
}
store() {
setTimeout(async () => {
try {
if (this.started) {
await this._store();
}
}
catch (error) {
log.error(error, { component: components.GRAPH_STORE });
}
finally {
if (this.started) {
this.store();
}
}
}, this.INTERVAL);
}
_store() {
return new Promise(async (resolve, reject) => {
try {
if (!this.nodesMap) {
return reject(new Error('nodeMap not referenced'));
}
this.caluclatedGraph = this.nodesMap.getJSONGraph(this._filterData);
await RedisStorage.put(this.caluclatedGraph);
return resolve();
}
catch (error) {
return reject(new Error(`faild on storing graph to redis error:${error}`));
}
});
}
filterData(graph) {
const { EDGE } = groupTypes;
const adaptedGraph = {
jobId: this.currentJobID,
graph: {
edges: [],
nodes: [],
}
};
adaptedGraph.graph.edges = graph.edges.map(e => ({ from: e.v, to: e.w, group: e.value[0].type === EDGE.WAIT_ANY ? EDGE.WAIT_ANY : EDGE.NONE }));
adaptedGraph.graph.nodes = graph.nodes.map(n => this._handleNode(n.value));
return adaptedGraph;
}
_handleNode(node) {
if (node.batch.length === 0) {
return this._handleSingle(node);
}
return this._handleBatch(node);
}
_handleSingle(node) {
const { SINGLE } = groupTypes;
const calculatedNode = { id: node.nodeName, label: node.nodeName, extra: {}, group: SINGLE.NOT_STARTED };
calculatedNode.group = this._singleStatus(node.status);
return calculatedNode;
}

_handleBatch(node) {
const { BATCH } = groupTypes;
const calculatedNode = { id: node.nodeName, label: node.nodeName, extra: {}, group: BATCH.NOT_STARTED };
const batchStatus = this._batchStatusCounter(node);
if (batchStatus.completed === node.batch.length) {
calculatedNode.extra.batch = `${node.batch.length}/${node.batch.length}`;
calculatedNode.group = BATCH.COMPLETED;
}
else if (batchStatus.idle === node.batch.length) {
calculatedNode.extra.batch = `0/${node.batch.length}`;
calculatedNode.group = BATCH.NOT_STARTED;
}
else {
calculatedNode.extra.batch = `${batchStatus.running}/${node.batch.length}`;
calculatedNode.group = BATCH.RUNNING;
}
return calculatedNode;
}


_batchStatusCounter(node) {
const batchState = {
idle: 0,
completed: 0,
running: 0,
};

node.batch.forEach((b) => {
const { STATUS } = groupTypes;
const status = this._singleStatus(b.status);
if (status === STATUS.COMPLETED) {
batchState.completed += 1;
}
else if (status === STATUS.NOT_STARTED) {
batchState.idle += 1;
}
else {
batchState.running += 1;
}
});
return batchState;
}
_singleStatus(s) {
const { STATUS } = groupTypes;
if (s === States.SUCCEED || s === States.FAILED) {
return STATUS.COMPLETED;
}
else if (s === States.CREATING || s === States.PENDING) {
return STATUS.NOT_STARTED;
}
return STATUS.RUNNING;
}
}


module.exports = new GraphStore();
74 changes: 74 additions & 0 deletions core/pipeline-driver/lib/datastore/redis-storage-adapter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
const { Factory } = require('@hkube/redis-utils');
const pathLib = require('path');
const log = require('@hkube/logger').GetLogFromContainer();
const components = require('../../common/consts/componentNames');


class RedisAdapter {
constructor() {
this.PREFIX_PATH = 'pipeline-driver/graph';
this._isInit = false;
this.client = null;
this.currentJobId = '';
this.path = null;
}

async init(options) {
if (!this._isInit) {
this.client = Factory.getClient(options.redis);
this._isInit = true;
log.info('redis initiated', { component: components.REDIS_PERSISTENT });
}
}

setJobId(jobid) {
this.currentJobId = jobid;
this.path = pathLib.join('/', this.PREFIX_PATH, this.currentJobId);
}
async put(options) {
return this._set(options);
}

_set(data) {
return new Promise((resolve, reject) => { // eslint-disable-line
if (!this.path) {
return reject(new Error('path not set'));
}
this.client.set(this.path, JSON.stringify(data), (err) => {
if (err) {
return reject(err);
}
return resolve(true);
});
});
}

async get() {
return this._get();
}

_get() {
return new Promise((resolve, reject) => {
this.client.get(this.path, (err, res) => {
if (err) {
return reject(err);
}
return resolve(this._tryParseJSON(res));
});
});
}


_tryParseJSON(json) {
let parsed = json;
try {
parsed = JSON.parse(json);
}
catch (e) {
log.warn(`fail to parse json ${json} `, { component: components.REDIS_PERSISTENT });
}
return parsed;
}
}

module.exports = new RedisAdapter();
5 changes: 5 additions & 0 deletions core/pipeline-driver/lib/nodes/nodes-map.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const EventEmitter = require('events');
const graphlib = require('graphlib');
const { Graph } = require('graphlib');
const deepExtend = require('deep-extend');
const GroupBy = require('../helpers/group-by');
Expand Down Expand Up @@ -280,6 +281,10 @@ class NodesMap extends EventEmitter {
return states.every(this._isCompleted);
}

getJSONGraph(filterFunction) {
return filterFunction(graphlib.json.write(this._graph));
}

getAllNodes() {
const nodes = this._graph.nodes();
return nodes.map(n => this._graph.node(n));
Expand Down
4 changes: 4 additions & 0 deletions core/pipeline-driver/lib/tasks/task-runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const log = require('@hkube/logger').GetLogFromContainer();
const component = require('../../common/consts/componentNames').TASK_RUNNER;
const { metricsNames } = require('../consts/metricsNames');
const { tracer, metrics, utils } = require('@hkube/metrics');
const graphStore = require('../datastore/graph-store');

metrics.addTimeMeasure({
name: metricsNames.pipelines_net,
Expand Down Expand Up @@ -109,6 +110,8 @@ class TaskRunner {
this._runNode(node.nodeName, node.parentOutput, node.index);
});
this._progress = new Progress({ calcProgress: this._nodes.calcProgress, sendProgress: this._stateManager.setJobStatus });
graphStore.start(job.data.jobId, this._nodes);


await this._stateManager.watchTasks({ jobId: this._jobId });
const watchState = await this._stateManager.watchJobState({ jobId: this._jobId });
Expand Down Expand Up @@ -195,6 +198,7 @@ class TaskRunner {
this._job = null;
this._jobId = null;
this._stateManager.clean();
graphStore.stop();
this._stateManager = null;
this._progress = null;
}
Expand Down

0 comments on commit e1b67fa

Please sign in to comment.