Skip to content

Commit

Permalink
wf fix abort logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeTWC1984 committed Oct 26, 2023
1 parent d8c6233 commit f3a3442
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions bin/workflow.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ let finishingJobs = {} // a map for

let errorCount = 0
let shuttingDown = false
let aborting = false
let normalShutDown = false
let exceededMaxErrors = false
let max_errors = parseInt(process.env['WF_MAXERR']);

const print = (text) => {
Expand Down Expand Up @@ -51,7 +54,10 @@ function sleep(millis) { return new Promise(resolve => setTimeout(resolve, milli

async function abortPending() {

if(shuttingDown) return
if(normalShutDown) return
if(aborting) return

aborting = true
shuttingDown = true

print(' │')
Expand Down Expand Up @@ -98,7 +104,8 @@ stream.on('json', function (job) {
let opts = job.options || {}

let startFrom = opts.wf_start_from_step || 1
if (startFrom > taskList.length) throw new Error('"Start From" parameter cannot exceed event list length')
if (startFrom > taskList.length) stream.write({complete: 1, code: 1, description: '"Start From" parameter cannot exceed event list length'})
// throw new Error('"Start From" parameter cannot exceed event list length')

let skip = taskList
.filter((e, i) => (i + 1 < startFrom) || !!e.disabled)
Expand All @@ -110,7 +117,8 @@ stream.on('json', function (job) {
if (concur > taskList.length) concur = taskList.length

/// sanity check
if (taskList.length == 0) throw new Error('At least one workflow event is required');
if (taskList.length == 0) stream.write({complete: 1, code: 1, description: 'At least one workflow event is required'})
// throw new Error('At least one workflow event is required');
if (taskList.filter(e => e.id == process.env['JOB_EVENT']).length > 0) throw new Error("Workflow refers to itself");

async function poll() {
Expand Down Expand Up @@ -254,7 +262,8 @@ stream.on('json', function (job) {
}
else {
finishingJobs[j] = 1
print(` ├───────> ${jobStatus[j].title} (${j}) is completed, waiting or status`)
print(` ├───────> ${jobStatus[j].title} (${j}): faild to fetch status due to [${e.message}]. Will retry few more times`)
stream.write({memo: `API error: ${j}`})
}

// if cannot retreive job detail, mark job as "incomplete" and retry on the few next cycles before reporting an error
Expand All @@ -265,7 +274,7 @@ stream.on('json', function (job) {
continue
}
else {
print(` │ ⚠️ Failed to retreive data from get_job_details due to: ${e.message}`)
print(` │ ⚠️ Failed to fetch ${j} status after multiple attempts, marking job as failed`)
jd = {
data: {
job: {
Expand Down Expand Up @@ -343,22 +352,20 @@ stream.on('json', function (job) {

print(msg);

if (max_errors && errorCount >= max_errors) {
print(` │${EOL} ⚠️ Error count exceeded maximum, aborting workflow...`)
shuttingDown = true;
if (max_errors && errorCount >= max_errors && !aborting) {
print(` │${EOL} ⚠️ Error count exceeded maximum, aborting workflow...`)
exceededMaxErrors = true;
await abortPending();
throw new Error("WF Error count exceeded maximum");

}

print(' │');
stream.write({ progress: (1 - pendingJobs / taskList.length) })
}
}
} // for each job

}
} // while

if(shuttingDown) print(`${EOL}\u001b[1m\u001b[31mWorkflow Aborted\u001b[39m\u001b[22m @${(new Date()).toLocaleString()} `)
if(shuttingDown || aborting) print(`${EOL}\u001b[1m\u001b[31mWorkflow Aborted\u001b[39m\u001b[22m @${(new Date()).toLocaleString()} `)
else print(`${EOL}\u001b[1m\u001b[32mWorkflow Completed\u001b[39m\u001b[22m @${(new Date()).toLocaleString()} `)

// print performance
Expand Down Expand Up @@ -407,10 +414,12 @@ stream.on('json', function (job) {

let result = { complete: 1, code: 0 }
if (errorCount > 0) result = { complete: 1, code: (wf_strict ? 1 : 255), description: `WF - ${errorCount} out of ${taskList.length} jobs reported error` }
if (exceededMaxErrors) result = { complete: 1, code: 1, description: "WF Error count exceeded maximum" }
if (errorCount == taskList.length) result = { complete: 1, code: 1, description: "WF - All jobs failed" }
stream.write(result)

shuttingDown = true
normalShutDown = true // block abortPending on normal exit
if(process.connected) process.disconnect()

};
Expand Down

0 comments on commit f3a3442

Please sign in to comment.