-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathbatchProcessor.js
109 lines (103 loc) · 3.5 KB
/
batchProcessor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/* eslint-disable no-underscore-dangle */
/* eslint-disable no-await-in-loop */
/**
* BatchProcessor allows you to process any input stream in batches.
* Fetch database records, APIs, or any other paginated data input stream
* and process your data in batches instead of in one go.
*/
class BatchProcessor {
/**
* Pass a function to fetch data in a paginated manner using fetchRecordsFn.
* You can set the page size and offset for fetching.
* When runOnce is false, the batches will start over from the beginning at page 0 offset 0.
* Optionally provide a context to be passed to fetchRecordsFn and processRecordsFn.
* This is useful for passing database or http connections, etc.
*
* @param {{
* fetchRecordsFn: function,
* processRecordsFn: function,
* pageSize: number,
* offset: number,
* sleepMs: number - Number of ms to sleep between fetch calls,
* runOnce: boolean,
* context: {Object}
* }} configuration
*/
constructor({
fetchRecordsFn,
processRecordsFn,
pageSize = 10,
offset = 0,
sleepMs = 0,
runOnce = true,
context,
}) {
if (!fetchRecordsFn) {
throw new Error('Must passed a function that fetches rercods');
}
if (!processRecordsFn) {
throw new Error('Must passed a function that processes records');
}
this.fetchRecordsFn = fetchRecordsFn;
this.processRecordsFn = processRecordsFn;
this.pageSize = pageSize;
this.offset = offset;
this.sleep = sleepMs;
this.runOnce = runOnce;
this.stopped = true;
this.currentPage = 0;
// context passed to fetchRecordsFn and processRecordsFn function
this.context = context;
}
processRecords(context, records) {
return this.processRecordsFn(context, records);
}
_sleep() {
return new Promise((resolve) => setTimeout(resolve, this.sleep));
}
async run() {
while (!this.stopped) {
const records = await this.fetchRecordsFn(this.context, this.pageSize, this.offset);
if (records.length > 0) {
await this.processRecords(this.context, records);
this.offset = this.pageSize + this.pageSize * this.currentPage;
this.currentPage += 1;
if (this.sleep) {
await this._sleep(this.sleep);
}
} else {
// If there are no records left we processed all of them
this.offset = 0;
this.currentPage = 0;
if (this.runOnce) {
this.stop();
}
}
}
}
/**
* Start processing the batches.
*/
start() {
this.stopped = false;
return this.run();
}
/**
* Optionally stop the batches.
* Note: If runOnce is true, batch processor will stop once it exhausts all data.
* If runOnce is false, it is up to you to stop.
*/
stop() {
this.stopped = true;
/*
Setting stopped true will not stop the run() function immeadeatly since
it could be in the middle of the while loop and a Promise excution.
Resolving in the nextTick will mean that promises will be handled
and run should exit the while loop
*/
return new Promise((resolve) => {
process.nextTick(resolve);
});
}
}
module.exports = BatchProcessor;