Skip to content

Commit

Permalink
Merge pull request #637 from vergenzt/auto-concurrency
Browse files Browse the repository at this point in the history
Add concurrency argument to async.auto
  • Loading branch information
aearly committed Oct 26, 2015
2 parents 56bdb50 + f801c68 commit fbeab9b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1256,7 +1256,7 @@ cargo.push({name: 'baz'}, function (err) {
---------------------------------------
<a name="auto" />
### auto(tasks, [callback])
### auto(tasks, [callback], [concurrency])
Determines the best order for running the functions in `tasks`, based on their requirements. Each function can optionally depend on other functions being completed first, and each function is run as soon as its requirements are satisfied.
Expand Down Expand Up @@ -1307,6 +1307,8 @@ __Arguments__
pass an error to their callback. Results are always returned; however, if
an error occurs, no further `tasks` will be performed, and the results
object will only contain partial results.
* `concurrency` - An `integer` for determining the maximum number of tasks that
can be run in parallel. By default, as many as possible.
__Example__
Expand Down
11 changes: 9 additions & 2 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,19 @@
}
};

async.auto = function (tasks, callback) {
async.auto = function (tasks, callback, concurrency) {
callback = _once(callback || noop);
var keys = _keys(tasks);
var remainingTasks = keys.length;
if (!remainingTasks) {
return callback(null);
}
if (!concurrency) {
concurrency = remainingTasks;
}

var results = {};
var runningTasks = 0;

var listeners = [];
function addListener(fn) {
Expand All @@ -543,6 +547,7 @@
_arrayEach(keys, function (k) {
var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];
var taskCallback = _restParam(function(err, args) {
runningTasks--;
if (args.length <= 1) {
args = args[0];
}
Expand Down Expand Up @@ -572,18 +577,20 @@
}
}
function ready() {
return _reduce(requires, function (a, x) {
return runningTasks < concurrency && _reduce(requires, function (a, x) {
return (a && results.hasOwnProperty(x));
}, true) && !results.hasOwnProperty(k);
}
if (ready()) {
runningTasks++;
task[task.length - 1](taskCallback, results);
}
else {
addListener(listener);
}
function listener() {
if (ready()) {
runningTasks++;
removeListener(listener);
task[task.length - 1](taskCallback, results);
}
Expand Down
29 changes: 29 additions & 0 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,35 @@ exports['auto'] = function(test){
});
};

exports['auto concurrency'] = function (test) {
var concurrency = 2;
var runningTasks = [];
var makeCallback = function(taskName) {
return function(callback) {
runningTasks.push(taskName);
setTimeout(function(){
// Each task returns the array of running tasks as results.
var result = runningTasks.slice(0);
runningTasks.splice(runningTasks.indexOf(taskName), 1);
callback(null, result);
});
};
};
async.auto({
task1: ['task2', makeCallback('task1')],
task2: makeCallback('task2'),
task3: ['task2', makeCallback('task3')],
task4: ['task1', 'task2', makeCallback('task4')],
task5: ['task2', makeCallback('task5')],
task6: ['task2', makeCallback('task6')]
}, function(err, results){
Object.keys(results).forEach(function(taskName) {
test.ok(results[taskName].length <= concurrency);
});
test.done();
}, concurrency);
};

exports['auto petrify'] = function (test) {
var callOrder = [];
async.auto({
Expand Down

0 comments on commit fbeab9b

Please sign in to comment.