Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: Could not remove job #1098

Closed
alolis opened this issue Oct 23, 2018 · 18 comments
Closed

Error: Could not remove job #1098

alolis opened this issue Oct 23, 2018 · 18 comments

Comments

@alolis
Copy link
Contributor

alolis commented Oct 23, 2018

Description

I am experiencing the following error whenever I try to remove a job:

2018-10-23T14:08:46.619Z - debug: Error: Could not remove job 1
    at /myproject/node_modules/bull/lib/job.js:392:15
From previous event:
    at /myproject/node_modules/bull/lib/job.js:388:42
    at runCallback (timers.js:637:20)
    at tryOnImmediate (timers.js:610:5)
    at processImmediate [as _immediateCallback] (timers.js:582:5)

Minimal, Working Test code to reproduce the issue.

I am using the sandboxed environment and the issue is reproducible even with a simple processor like so:

// processor.js
module.exports = function myProcessor(job, done) {
  setTimeout(() => {
    done();
  }, 60000);
}

// normal code
import Queue from 'bull';

const queue = new Queue('myQueue');

queue.process('/path/to/my/processor.js');
await queue.add({foo: 'bar'}, {uuid: 1});

const job = await queue.getJob(1);
await job.remove(); // error here

Bull version

I am using Node.js v6.9.1 with bull v3.4.8 and redis-server v3.0.7

Also by looking at this I understand that I can remove jobs while in progress. Is this true? What is the correct way to force remove a job if indeed true?

Thank you for your time.

@manast
Copy link
Member

manast commented Oct 23, 2018

you cannot remove a job while it is being processed, a job is in the active state and locked until it either: completes, fails or stales, although it should never stale if using a sandbox processor in practice.

@alolis
Copy link
Contributor Author

alolis commented Oct 24, 2018

@manast , thanks for getting back to me. Isn't there some workaround I can do without destroying the library? I find it very strange that nobody has requested this functionality! Any suggestion would be great since this is a must for my service :( Can I force fail it somehow and then remove it?

I would expect this to work but it didn't:

async remove() {
  await this.moveToFailed(new Error('removed by user'), true);
  return super.remove();
}

Also, I recommend to update the docs if you are planning to leave it as it is since it's not clear that this is intended or maybe change the error to be more descriptive.

@alolis
Copy link
Contributor Author

alolis commented Oct 24, 2018

I tried to do the following which indeed removes the job from Redis but does not kill the processor process (makes sense if remove was intended for completed/failed and probably does not kill the pid).

async remove() {
  await this.releaseLock();
  return super.remove();
}

I didn't like it anyway because I do not want to mess with the locks.

The only way to do this probably is to signal via Redis pub/sub events the processor and throw new Error('removed') in the processor to exit as it would normally do.....unless there is something else I could use which I am all ears! e.g. Can I find the child process pid somehow via bull and force kill it and then remove the job normally

@manast
Copy link
Member

manast commented Oct 24, 2018

We have some discussion regarding aborting jobs here: #114
Basically there is no complication in just killing a running job if we are using a sandboxed processor, but what is a challenge is how to do it so that the worker can be closed graciously, still an open discussion.

@alolis
Copy link
Contributor Author

alolis commented Oct 25, 2018

@manast thanks for getting back to me. That discussion is going on since 2015 from what I checked so it's not very promising :)

Last night I had a quick look at what's going on with cancel and I did the following change as an experiment:

// sandbox.js

'use strict';

var Promise = require('bluebird');

module.exports = function(processFile, childPool) {
  return function process(job) {
    return childPool.retain(processFile).then(function(child) {
      child.send({
        cmd: 'start',
        job: job
      });

      var done = new Promise(function(resolve, reject, onCancel) {      
        function handler(msg) {
          switch (msg.cmd) {
            case 'completed':
              child.removeListener('message', handler);
              resolve(msg.value);
              break;
            case 'failed':
            case 'error':
              child.removeListener('message', handler);
              var err = new Error();
              Object.assign(err, msg.value);
              reject(err);
              break;
            case 'progress':
              job.progress(msg.value);
              break;
          }
        }

        // Added onCancel handler!
        onCancel(() => {
          child.removeListener('message', handler);         
          reject(new Error('Job has been cancelled'));
        });

        child.on('message', handler);
      });

      return done.finally(function() {
        // Handle cancellation here!
       if (done.isCancelled()) {         
          job.discard();
          job.moveToFailed(new Error('cancelled'), true).then(() => child.kill());           
        }

        childPool.release(child);
      });
    });
  };
};
// normal code
.on('active', function(job, jobPromise){
 // Wait a few seconds just for testing before canceling
  setTimeout(async () => {     
      jobPromise.cancel();
    }, 20000);
});

Do you think you can provide me an alternative which might be better? The above however does not cover handlers that are not sandboxed. I think this should be done in queue.js by wrapping the handler to make it cancellable and reject the promise returned by the processor inside the onCancel callback.

I do not mind changing my fork and using that for now in order to have abort functionality but I must really implement something so I can complete the project I am doing so please, if you have a better suggestion I am all ears, even if its a bit ugly!

Not sure if you are interested in this but I pushed a PR anyway #1105

@manast
Copy link
Member

manast commented Oct 25, 2018

Unfortunately I have quite a log of workload right now so I could not say when I can implement such a feature. Although I think the right approach is exactly the one you are pursuing, i.e. a cancel mechanism inspired in promise cancellation.

@alolis
Copy link
Contributor Author

alolis commented Oct 26, 2018

@manast i understand although I wasn't expecting a feature implementation at this point, just your thoughts on how to solve it, temporarily at least on my side. I did solve it yesterday with the sandbox approach I described above but it also required a change in the Queue class. The complete code I am now using, in combination with the refactored sandbox is the following:

import _ from 'lodash';
import Queue from 'bull';

class ExtendedQueue extends Queue {
  constructor(...args) {
    super(...args);

    this.activeJobPromises = {};

    this.on('active', (job, jobPromise) => {
      _.set(this.activeJobPromises, parseInt(job.id), jobPromise);
    });

    this.on('completed', (job) => {
      _.unset(this.activeJobPromises, parseInt(job.id));
    });

    this.on('stalled', (job) => {
      _.unset(this.activeJobPromises, parseInt(job.id));
    });

    this.on('failed', (job) => {
      _.unset(this.activeJobPromises, parseInt(job.id));
    });
  }

  getActiveJobPromise(jobId) {
    return _.get(this.activeJobPromises, parseInt(jobId));
  }
}

class ExtendedJob extends Queue.Job {
  async remove() {
    if (this.isActive()) {
      const jobPromise = this.queue.getActiveJobPromise(this.id);
      jobPromise.cancel();
      await this.releaseLock();
    }

    await super.remove();
  }

  async moveToFailed(error, ignoreLock) {
    await super.moveToFailed(error, ignoreLock);
    this.queue.emit('failed', this, error, 'active');
  }
}

If you look above, i keep track of the job promises inside the Queue class and cancel the promise on remove before removing the job itself. Seems to work well so far but I will keep testing.

@simpleshadow
Copy link

@alolis When you have a chance, mind sharing how you used the above?

Also can't believe no one else has asked to abort a job!

@manast
Copy link
Member

manast commented Nov 18, 2018

@simpleshadow aborting a job at the end of the day will require your process function to be prepared for it, so basically you could also implement some signalling mechanism to your process function so that your code can fail the process earlier. All in all, a hard problem to solve but we are working on a more integrated solution.

@alolis
Copy link
Contributor Author

alolis commented Nov 19, 2018

@simpleshadow , it has been a major pain point for my use case unfortunately, along with this.

I came up with two solutions for the abort based on the current design of bull. I will provide you both and perhaps you can suggest some improvement until a proper cancellation mechanism is implemented in the library.

Both solutions require you to extend the Job and Queue classes of bull.

SOLUTION 1

Overview:

Although bull already has a mechanism in place for the main process to communicate with the forked children (which runs the processor code) for its own needs, unfortunately this API is not exposed so...we need to use a custom internal event emitter that goes through Redis in order to allow communication between the main code and the forked processors. Check this issue as well.

When a job is removed, a custom job:cancelled event is emitted and the processor listens on job:cancelled events and if the job.id of the processor matches the job id from the event, it will execute a process.exit() which will kill the process and the queue handleFailed will kick in and the job will automatically be moved to failed state and the lock will be released, allowing you to execute job.remove() to complete the removal.

The advantage of this solution is that NO changes are required to the bull library (except a small bug I found and it wont allow this solution to work but I will submit a PR for it soon and provide more info at the end of this post) and by extending a couple of classes, the limitation can be overcome.

Problems

  1. The job needs to be discarded first before doing any further actions or else when the job will be moved to failed, it will automatically be retried and we do not want that. In order to discard the active job which the underlying processor is using, is to have access to the already instantiated object and to do that, we need to keep track of all those objects when a active event triggers which carries that object and discard by reference. We solve this by extending the Queue class.

  2. If you add and remove a job too fast, the child process might not be forked yet (it takes a couple of seconds for the process to start), which means that when you emit the cancellation event the processor might not receive it and it will never exit. This was the reason that we didn't roll out this solution in production, I didn't have more time to think how to solve it elegantly.

Code

import Queue from 'bull';
import _ from 'lodash';

/**
 * The class will extend the main `Queue` class from bull in order to work around
 * a few limitations of the underlying library.
 */
class ExtendedQueue extends Queue {
  /**
   * Override the default constructor in order to keep track of the job objects
   * for all active jobs. The reason we do this is because we need to be able
   * to discard the already instantiated job objects which the queue processor
   * is using and doing it by reference is the only way at the moment.
   *
   * @override
   * @param  {Array} args - Array of arguments that will be passed to parent class.
   */
  constructor(...args) {
    super(...args);

    this.activeJos = {};

    this.on('active', (job, jobPromise) => {
      _.set(this.activeJos, parseInt(job.id), job);
    });

    const unsetEvents = ['completed', 'stalled', 'removed', 'failed'];
    _.forEach(unsetEvents, (unsetEvent) => {
      this.on('completed', (job) => {
        _.unset(this.activeJos, parseInt(job.id));
      });
    });
  }

  /**
   * Returns the job object for an active job.
   *
   * @param {string|number} jobId - The job id.
   * @returns {Object} - Returns the job object or undefined if the job is not found.
   */
  getActiveJob(jobId) {
    return _.get(this.activeJobs, parseInt(jobId));
  }
}

/**
 * The class will extend the `Job` class from the bull module in order to work around
 * a few limitations of the underlying library.
 */
class ExtendedJob extends Queue.Job {
  /**
   * Will cancel an active job promise.
   */
  async cancel() {
    if (await this.isActive()) {
      const activeJob = this.queue.getActiveJob(this.id);
      activeJob.discard();

      getInternalEventEmitter.emit('job:cancelled', jobId);

      try {
        await this.finished();
      } catch (e) {
        // We do not care at this point; we just need to wait until queue does its thing
        // and locks have been released.
      }
    }
  }

  /**
   * Remove a job from queue, even if its in progress.
   *
   * @override
   */
  async remove() {
    await this.cancel();
    await super.remove();
  }
}

// Override the default Job prototype in order for all Job objects to actually be ExtendedJob objects
ExtendedQueue.Job.prototype = ExtendedJob.prototype

/**
 * Wraps a processor and adds common functionality in order to avoid code duplication.
 * Make sure that ALL processors are exported by being wrapped first.
 *
 * @param {function} processor
 * @throws {JobError}
 * @returns {Promise}
 */
function wrapProcessor(processor) {
  function initializeEventHandlers(job) {
    const internalEmitter = getInternalEventEmitter();

    internalEmitter.on('job:cancelled', (jobId) => {
      if (job.id === jobId) {
        process.exit();
      }
    });
  }

  async function processorWrapper(job) {
    initializeEventHandlers(job);

    const result = await processor(job);
    
    return result;
  }

  return processorWrapper;
}

/**
 * Emits/Listens an event with the use of a Redis publisher. The reason that we need this function
 * is because `bull` does not provide a way for the forked child job processes to communicate with
 * the main worker and due to limitation of the sandboxed environment, we can not call `job.update`
 * directly from within that environment, so, any job updates must happen in the main worker. Therefore,
 * we need to be able to actually send data to the main worker somehow and this is done via the,
 * already in place, pub/sub system of Redis.
 *
 * @see {@link https://github.com/OptimalBits/bull/issues/1056}
 * @see {@link https://github.com/OptimalBits/bull/issues/1067}
 * @returns {Object} - An object with emit and on methods.
 */
function getInternalEventEmitter() {
  const channelName = config.get('jobs.internalChannelName');
  const publisher = Redis.getPublisher();
  const subscriber = Redis.getSubscriber(channelName);

  const handleMessage = (eventName, handler) => {
    subscriber.on('message', (unusedChannel, message) => {
      const {event, data} = JSON.parse(message);

      if (eventName === event || eventName === '*') {
        handler(data, event);
      }
    });
  };

  return {
    emit: (event, data) => publisher.publish(channelName, JSON.stringify({event, data})),
    on: (event, handler) => handleMessage(event, handler),
    unsubscribe: () => subscriber.unsubscribe()
  };
}

Example Usage

// processor.js
async function myProcessor(job) {
  // whatever
}

export default wrapProcessor(myProcessor);

// main code
const queue = new ExtendedQueue('myQueue');
queue.process('/path/to/processor.js');
queue.add(...);
const job = await queue.getJob(1); // job is an ExtendedJob object
await job.remove(); // should kill child process and remove from queue

SOLUTION 2

Overview:

Use the bluebird cancellation mechanism in sandbox.js. The queue processor will create a Promise for each sandboxed processor which will attach to the active event of the queue. We can use that promise to execute a jobPromise.cancel() which will trigger a rejection within the sandbox and kill the child process from the child pool (which the sandbox has access to).

I had to use my own fork in order to achieve this solution. The only changes that are required can be found here:
https://github.com/alolis/bull/blob/rapiddot/lib/process/sandbox.js

Problems

  1. Bluebird cancellation is not enabled in bull and in fact, bluebird will be removed completely from future versions and native promises will be used instead. We solve this in our forked version of bull.

  2. Even if a job promise is cancelled, there is no cancellation handler in the sandbox processor. We need this in order to detect it and kill the child process from with the sandboxed processor. We solve this in our forked version of bull.

  3. The promise returned from the queue processor is only available during the active event so we need to keep track of those promises. We solve this by extending the Queue class.

Code

import Queue from 'bull';
import _ from 'lodash';

/**
 * The class will extend the main `Queue` class from bull in order to work around
 * a few limitations of the underlying library.
 */
class ExtendedQueue extends Queue {
  /**
   * Override the default constructor in order to keep track of the job promises
   * for all active jobs. The reason we do this is because we need to be able
   * to cancel a job whenever required (e.g. when a job is removed).
   *
   * @override
   * @param  {Array} args - Array of arguments that will be passed to parent class.
   */
  constructor(...args) {
    super(...args);

    this.activeJobPromises = {};

    this.on('active', (job, jobPromise) => {
      _.set(this.activeJobPromises, parseInt(job.id), jobPromise);
    });

    const unsetEvents = ['completed', 'stalled', 'removed', 'failed'];
    _.forEach(unsetEvents, (unsetEvent) => {
      this.on('completed', (job) => {
        _.unset(this.activeJobPromises, parseInt(job.id));
      });
    });
  }

  /**
   * Returns the promise for an active job.
   *
   * @param {string|number} jobId - The job id.
   * @returns {Object} - Returns the job object or undefined if the job is not found.
   */
  getActiveJobPromise(jobId) {
    return _.get(this.activeJobPromises, parseInt(jobId));
  }
}

/**
 * The class will extend the `Job` class from the bull module in order to work around
 * a few limitations of the underlying library.
 */
class ExtendedJob extends Queue.Job {
  /**
   * Will cancel an active job promise.
   */
  async cancel() {
    if (await this.isActive()) {
      const activeJobPromise = this.queue.getActiveJobPromise(this.id);
      activeJobPromise.cancel();

      try {
        await this.finished();
      } catch (e) {
        // We do not care at this point; we just need to wait until queue does its thing
        // and locks have been released.
      }
    }
  }

  /**
   * Remove a job from queue, even if its in progress.
   *
   * @override
   */
  async remove() {
    await this.cancel();
    await super.remove();
  }
}

// Override the default Job prototype in order for all Job objects to actually be ExtendedJob objects
ExtendedQueue.Job.prototype = ExtendedJob.prototype

Example Usage

// processor.js
async function myProcessor(job) {
  // whatever
}

export default wrapProcessor(myProcessor);

// main code
const queue = new ExtendedQueue('myQueue');
queue.process('/path/to/processor.js');
queue.add(...);
const job = await queue.getJob(1); // job is an ExtendedJob object
await job.remove(); // should kill child process and remove from queue

The appropriate solution

In my opinion the correct solution to the whole issue is to allow attaching optional hooks to the processor by changing the processor signature to:

myProcessor(job, {onCancel) {
   onCancel(() => {
      // register our handler to the queue which initializes the processor
  }
}

And then you will be able to do things like:

queue.cancelJob(jobId)
job.cancel()
job.remove() // which internally will call job.cancel() first

The processor will have the chance to do a graceful shutdown before exiting or even better, throwing a CancellationError in order for the queue to make special handling, such as ensuring that the child process is no longer running.

The above also allows to be extended by providing onPause and onResume handlers in order to have the ability to pause the processor for a bit.

If no hooks are provided then the process should be forced killed and moved to failed with 'cancelled' failed reason.

Bug in bull that will not allow Solution 1 to work without patching:

Although Solution 1 is less intrusive, it will not work due to a bug in sandbox.js. The bug has to do with how the child pool re-uses forked processes. If a process exits, then the process is basically dead and it should not be re-used, but the current code does not take this under consideration and pushes the child back to the pool and if it's re-used you get a Error: channel is closed message. The solution is to change the sandbox.js file and instead of this:

return done.finally(function() {
        childPool.release(child);
      })

to do this:

 if (child.exitCode !== null) {
          childPool.remove(child);
        } else {
          childPool.release(child);
        }

The above fix is already included in my fork, along with some cleanup of the child handlers when the child process exists, in case someone wants to check.

@manast this section will definitely interest you, I am planning to make a PR soon unless you beat me to it.

Any comments/ideas/thoughts on all the above are welcome of course. I have already rolled out solution 2 in production and it seems to be working smoothly although i will work on fixing solution 1 in order to avoid using a custom fork.

@simpleshadow
Copy link

simpleshadow commented Nov 20, 2018

Thank you for all your work and detail @alolis.

I ended up attempting to use Bull for its queue and handled forking and killing processes outside of Bull accordingly. When a job would need to be aborted, the job's forked process would be killed and the its done method called to complete it with a result indicating it had been aborted along with data for the new job to replace it. In the completion callback, noting the job had been aborted, the new data is added to the queue. This also requires a storage and checking of dates in a separate DB outside of the redis queue (for the assets the forked process jobs are creating) so the processor knows whether or not to process a job for stalled cases.

The disconnect between the queue jobs and forked processes is concerning and will require a bit more testing, but I think satisfactory for my use case.

I'll try using your examples in the coming days and let you know if they work out better for me. Thanks again.

@alolis
Copy link
Contributor Author

alolis commented Nov 20, 2018

@simpleshadow, can you share some more details? Are you storing the forked process pid somewhere in order to kill the process afterwards or did you mean that you are forking the process yourself? How does the whole thing works exactly?

I will be waiting for your feedback as well :)

@kzidane
Copy link

kzidane commented Feb 11, 2019

@alolis @simpleshadow I'm curious if you ever found cleaner ways to achieve this?

@alolis
Copy link
Contributor Author

alolis commented Feb 11, 2019

@kzidane I haven't had the time to do something else other than what I described above. For the problem to be solved properly the fix must be done in the core library.

@meetbryce
Copy link

Any chance this feature is still on the cards?

@kodeine
Copy link

kodeine commented Feb 10, 2021

any work on this yet?

@kodeine
Copy link

kodeine commented Feb 10, 2021

@alolis would you be able to update your fork with latest changes in bull merged with your updates?

@alolis
Copy link
Contributor Author

alolis commented Aug 16, 2021

@kodeine, my apologies for the very long delay in replying. I was trying to find some time to write everything more clearly and...well, you know what they say about time!

If you still need it, you can find the latest approach here.

Let me know if you have any further questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants