Skip to content

Commit

Permalink
Add support for multiple pMapSkip's (#52)
Browse files Browse the repository at this point in the history
Co-authored-by: Sindre Sorhus <[email protected]>
  • Loading branch information
ferrinweb and sindresorhus authored Nov 2, 2021
1 parent e7ca665 commit 94eb532
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 9 deletions.
32 changes: 23 additions & 9 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export default async function pMap(

const result = [];
const errors = [];
const skippedIndexes = [];
const skippedIndexesMap = new Map();
let isRejected = false;
let isResolved = false;
let isIterableDone = false;
Expand Down Expand Up @@ -59,15 +59,28 @@ export default async function pMap(
if (resolvingCount === 0 && !isResolved) {
if (!stopOnError && errors.length > 0) {
reject(new AggregateError(errors));
} else {
isResolved = true;
return;
}

for (const skippedIndex of skippedIndexes) {
result.splice(skippedIndex, 1);
}
isResolved = true;

if (!skippedIndexesMap.size) {
resolve(result);
return;
}

const pureResult = [];

// Support multiple `pMapSkip`'s.
for (const [index, value] of result.entries()) {
if (skippedIndexesMap.get(index) === pMapSkip) {
continue;
}

pureResult.push(value);
}

resolve(pureResult);
}

return;
Expand All @@ -86,12 +99,13 @@ export default async function pMap(

const value = await mapper(element, index);

// Use Map to stage the index of the element.
if (value === pMapSkip) {
skippedIndexes.push(index);
} else {
result[index] = value;
skippedIndexesMap.set(index, value);
}

result[index] = value;

resolvingCount--;
await next();
} catch (error) {
Expand Down
37 changes: 37 additions & 0 deletions test-multiple-pmapskips-performance.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import test from 'ava';
import inRange from 'in-range';
import timeSpan from 'time-span';
import pMap, {pMapSkip} from './index.js';

function generateSkipPerformanceData(length) {
const data = [];
for (let index = 0; index < length; index++) {
data.push(pMapSkip);
}

return data;
}

test('multiple pMapSkips - algorithmic complexity', async t => {
const testData = [generateSkipPerformanceData(1000), generateSkipPerformanceData(10000), generateSkipPerformanceData(100000)];
const testDurationsMS = [];

for (const data of testData) {
const end = timeSpan();
// eslint-disable-next-line no-await-in-loop
await pMap(data, async value => value);
testDurationsMS.push(end());
}

for (let index = 0; index < testDurationsMS.length - 1; index++) {
// Time for 10x more items should take between 9x and 11x more time.
const smallerDuration = testDurationsMS[index];
const longerDuration = testDurationsMS[index + 1];

// The longer test needs to be a little longer and also not 10x more than the
// shorter test. This is not perfect... there is some fluctuation.
// The idea here is to catch a regression that makes `pMapSkip` handling O(n^2)
// on the number of `pMapSkip` items in the input.
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
}
});
44 changes: 44 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,28 @@ test('pMapSkip', async t => {
], async value => value), [1, 2]);
});

test('multiple pMapSkips', async t => {
t.deepEqual(await pMap([
1,
pMapSkip,
2,
pMapSkip,
3,
pMapSkip,
pMapSkip,
4
], async value => value), [1, 2, 3, 4]);
});

test('all pMapSkips', async t => {
t.deepEqual(await pMap([
pMapSkip,
pMapSkip,
pMapSkip,
pMapSkip
], async value => value), []);
});

test('all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
Expand Down Expand Up @@ -269,6 +291,28 @@ test('asyncIterator - pMapSkip', async t => {
]), async value => value), [1, 2]);
});

test('asyncIterator - multiple pMapSkips', async t => {
t.deepEqual(await pMap(new AsyncTestData([
1,
pMapSkip,
2,
pMapSkip,
3,
pMapSkip,
pMapSkip,
4
]), async value => value), [1, 2, 3, 4]);
});

test('asyncIterator - all pMapSkips', async t => {
t.deepEqual(await pMap(new AsyncTestData([
pMapSkip,
pMapSkip,
pMapSkip,
pMapSkip
]), async value => value), []);
});

test('asyncIterator - all mappers should run when concurrency is infinite, even after stop-on-error happened', async t => {
const input = [1, async () => delay(300, {value: 2}), 3];
const mappedValues = [];
Expand Down

0 comments on commit 94eb532

Please sign in to comment.