Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pipeline-driver-queue concurrent pipelines #1244

Merged
merged 3 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/pipeline-driver-queue/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules
.git
9 changes: 8 additions & 1 deletion core/pipeline-driver-queue/dockerfile/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
ARG BASE_PRIVATE_REGISTRY=""
FROM ${BASE_PRIVATE_REGISTRY}node:14.5.0 as install
ADD ./package-lock.json ./package.json /hkube/pipeline-driver-queue/
WORKDIR /hkube/pipeline-driver-queue
RUN npm ci --production


ARG BASE_PRIVATE_REGISTRY=""
FROM ${BASE_PRIVATE_REGISTRY}hkube/base-node:v1.2.0
LABEL maintainer="[email protected]"
RUN mkdir /hkube
COPY . /hkube/pipeline-driver-queue
RUN cd /hkube/pipeline-driver-queue
COPY --from=install /hkube/pipeline-driver-queue/node_modules /hkube/pipeline-driver-queue/node_modules
WORKDIR /hkube/pipeline-driver-queue
CMD ["node", "app.js"]
8 changes: 0 additions & 8 deletions core/pipeline-driver-queue/dockerfile/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,10 @@ then
BASE_PRIVATE_REGISTRY="${BASE_PRIVATE_REGISTRY}/"
fi
docker build -t ${TAG_VER} --build-arg BASE_PRIVATE_REGISTRY="${BASE_PRIVATE_REGISTRY}" -f ./dockerfile/Dockerfile .
if [ "${TRAVIS_PULL_REQUEST:-"false"}" == "false" ] || [ -z "${TRAVIS_PULL_REQUEST}" ]; then
TAG_CUR="${IMAGE_NAME}:latest"
docker tag ${TAG_VER} "${TAG_CUR}"
fi

if [ -v PRIVATE_REGISTRY ]
then
echo docker push ${TAG_VER}
docker push ${TAG_VER}
if [[ -v TAG_CUR ]]; then
echo docker push ${TAG_CUR}
docker push ${TAG_CUR}
fi
fi

1 change: 1 addition & 0 deletions core/pipeline-driver-queue/lib/jobs/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class JobConsumer {
_pipelineToQueueAdapter(pipeline, jobData) {
return {
...jobData,
experimentName: pipeline.experimentName,
pipelineName: pipeline.name,
priority: pipeline.priority,
entranceTime: Date.now(),
Expand Down
24 changes: 11 additions & 13 deletions core/pipeline-driver-queue/lib/jobs/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const { componentName } = require('../consts');
const component = componentName.JOBS_PRODUCER;
const persistence = require('../persistency/persistence');
const queueRunner = require('../queue-runner');
const JOB_ID_PREFIX_REGEX = /.+:(.+:)?/;

class JobProducer {
constructor() {
Expand Down Expand Up @@ -76,10 +75,10 @@ class JobProducer {
log.info(`${Events.ACTIVE} ${data.jobId}`, { component, jobId: data.jobId, status: Events.ACTIVE });
}).on(Events.COMPLETED, (data) => {
log.info(`${Events.COMPLETED} ${data.jobId}`, { component, jobId: data.jobId, status: Events.COMPLETED });
this._checkMaxExceeded(data.jobId);
this._checkMaxExceeded(data.options.data);
}).on(Events.FAILED, (data) => {
log.info(`${Events.FAILED} ${data.jobId}, ${data.error}`, { component, jobId: data.jobId, status: Events.FAILED });
this._checkMaxExceeded(data.jobId);
this._checkMaxExceeded(data.options.data);
}).on(Events.STALLED, (data) => {
log.warning(`${Events.STALLED} ${data.jobId}`, { component, jobId: data.jobId, status: Events.STALLED });
}).on(Events.CRASHED, async (data) => {
Expand All @@ -92,15 +91,13 @@ class JobProducer {
});
}

_checkMaxExceeded(jobId) {
const prefix = jobId.match(JOB_ID_PREFIX_REGEX);
if (prefix) {
const jobIdPrefix = prefix[0];
const job = queueRunner.queue.getQueue(q => q.maxExceeded).find(q => q.jobId.startsWith(jobIdPrefix));
if (job) {
log.info(`found and disable job with prefix ${jobIdPrefix} that marked as maxExceeded`, { component });
job.maxExceeded = false;
}
_checkMaxExceeded({ experiment, pipeline }) {
const job = queueRunner.queue
.getQueue(q => q.maxExceeded)
.find(q => q.experimentName === experiment && q.pipelineName === pipeline);
if (job) {
log.info(`found and disable job with experiment ${experiment} and pipeline ${pipeline} that marked as maxExceeded`, { component });
job.maxExceeded = false;
}
}

Expand All @@ -111,7 +108,8 @@ class JobProducer {
type: this._jobType,
data: {
jobId: pipeline.jobId,
pipeline: pipeline.pipelineName
pipeline: pipeline.pipelineName,
experiment: pipeline.experimentName
}
},
queue: {
Expand Down