Skip to content

Commit

Permalink
fix: pipeline-driver-queue concurrent pipelines (#1244)
Browse files Browse the repository at this point in the history
* fix: pipeline-driver-queue concurrentPipelines

* fix two stage build build

Co-authored-by: Yehiyam Livneh <[email protected]>
  • Loading branch information
nassiharel and yehiyam authored Apr 28, 2021
1 parent 3d77c92 commit d494e40
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
2 changes: 2 additions & 0 deletions core/pipeline-driver-queue/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules
.git
9 changes: 8 additions & 1 deletion core/pipeline-driver-queue/dockerfile/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
ARG BASE_PRIVATE_REGISTRY=""
FROM ${BASE_PRIVATE_REGISTRY}node:14.5.0 as install
ADD ./package-lock.json ./package.json /hkube/pipeline-driver-queue/
WORKDIR /hkube/pipeline-driver-queue
RUN npm ci --production


ARG BASE_PRIVATE_REGISTRY=""
FROM ${BASE_PRIVATE_REGISTRY}hkube/base-node:v1.2.0
LABEL maintainer="[email protected]"
RUN mkdir /hkube
COPY . /hkube/pipeline-driver-queue
RUN cd /hkube/pipeline-driver-queue
COPY --from=install /hkube/pipeline-driver-queue/node_modules /hkube/pipeline-driver-queue/node_modules
WORKDIR /hkube/pipeline-driver-queue
CMD ["node", "app.js"]
8 changes: 0 additions & 8 deletions core/pipeline-driver-queue/dockerfile/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,10 @@ then
BASE_PRIVATE_REGISTRY="${BASE_PRIVATE_REGISTRY}/"
fi
docker build -t ${TAG_VER} --build-arg BASE_PRIVATE_REGISTRY="${BASE_PRIVATE_REGISTRY}" -f ./dockerfile/Dockerfile .
if [ "${TRAVIS_PULL_REQUEST:-"false"}" == "false" ] || [ -z "${TRAVIS_PULL_REQUEST}" ]; then
TAG_CUR="${IMAGE_NAME}:latest"
docker tag ${TAG_VER} "${TAG_CUR}"
fi

if [ -v PRIVATE_REGISTRY ]
then
echo docker push ${TAG_VER}
docker push ${TAG_VER}
if [[ -v TAG_CUR ]]; then
echo docker push ${TAG_CUR}
docker push ${TAG_CUR}
fi
fi

1 change: 1 addition & 0 deletions core/pipeline-driver-queue/lib/jobs/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class JobConsumer {
_pipelineToQueueAdapter(pipeline, jobData) {
return {
...jobData,
experimentName: pipeline.experimentName,
pipelineName: pipeline.name,
priority: pipeline.priority,
entranceTime: Date.now(),
Expand Down
24 changes: 11 additions & 13 deletions core/pipeline-driver-queue/lib/jobs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const { componentName } = require('../consts');
const component = componentName.JOBS_PRODUCER;
const persistence = require('../persistency/persistence');
const queueRunner = require('../queue-runner');
const JOB_ID_PREFIX_REGEX = /.+:(.+:)?/;

class JobProducer {
constructor() {
Expand Down Expand Up @@ -76,10 +75,10 @@ class JobProducer {
log.info(`${Events.ACTIVE} ${data.jobId}`, { component, jobId: data.jobId, status: Events.ACTIVE });
}).on(Events.COMPLETED, (data) => {
log.info(`${Events.COMPLETED} ${data.jobId}`, { component, jobId: data.jobId, status: Events.COMPLETED });
this._checkMaxExceeded(data.jobId);
this._checkMaxExceeded(data.options.data);
}).on(Events.FAILED, (data) => {
log.info(`${Events.FAILED} ${data.jobId}, ${data.error}`, { component, jobId: data.jobId, status: Events.FAILED });
this._checkMaxExceeded(data.jobId);
this._checkMaxExceeded(data.options.data);
}).on(Events.STALLED, (data) => {
log.warning(`${Events.STALLED} ${data.jobId}`, { component, jobId: data.jobId, status: Events.STALLED });
}).on(Events.CRASHED, async (data) => {
Expand All @@ -92,15 +91,13 @@ class JobProducer {
});
}

_checkMaxExceeded(jobId) {
const prefix = jobId.match(JOB_ID_PREFIX_REGEX);
if (prefix) {
const jobIdPrefix = prefix[0];
const job = queueRunner.queue.getQueue(q => q.maxExceeded).find(q => q.jobId.startsWith(jobIdPrefix));
if (job) {
log.info(`found and disable job with prefix ${jobIdPrefix} that marked as maxExceeded`, { component });
job.maxExceeded = false;
}
_checkMaxExceeded({ experiment, pipeline }) {
const job = queueRunner.queue
.getQueue(q => q.maxExceeded)
.find(q => q.experimentName === experiment && q.pipelineName === pipeline);
if (job) {
log.info(`found and disable job with experiment ${experiment} and pipeline ${pipeline} that marked as maxExceeded`, { component });
job.maxExceeded = false;
}
}

Expand All @@ -111,7 +108,8 @@ class JobProducer {
type: this._jobType,
data: {
jobId: pipeline.jobId,
pipeline: pipeline.pipelineName
pipeline: pipeline.pipelineName,
experiment: pipeline.experimentName
}
},
queue: {
Expand Down

0 comments on commit d494e40

Please sign in to comment.