-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
244 lines (214 loc) · 6.65 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
const { S3 } = require('aws-sdk');
const { PassThrough } = require('stream');
const Archiver = require('archiver');
var argv = require('minimist')(process.argv.slice(2));
// get inputs from the user
const bucket = argv['b'];
const directory = argv['d'];
const zipFileName = argv['z'];
const deleteFiles = argv['r'] || false;
// check the inputs
if (!bucket || !directory || !zipFileName) {
console.log('Usage: npm run start -b <bucket> -d <directory> -z <zipFileName> [-r deleteFiles]');
process.exit(1);
}
const uploadParams = {
Bucket: bucket,
ContentType: 'application/zip',
Key: directory + zipFileName,
}
let filesList = [];
const uploadedFileParts = [];
const failedFileParts = [];
const streamPassThrough = new PassThrough();
const s3 = new S3();
// minUploadSize is the minimum size of a file to be uploaded in bytes
const minUploadSize = 10485760; // 10 MB
let totalFiles = 0;
let archivedFiles = 0;
let upload = null;
const archive = Archiver('zip', {});
async function main() {
archive.pipe(streamPassThrough);
archive
.on('error', (err) => {
console.error(`ZIP ERROR: ${err}`);
})
.on('progress', (progress) => {
archivedFiles = progress.entries.processed;
console.log(`Archived ${archivedFiles} of ${totalFiles} files`);
if (archivedFiles === totalFiles) {
console.log('All files processed');
archive.finalize();
}
});
console.log('Creating upload request');
upload = await createUpload();
console.log(`Upload created with ID: ${upload.UploadId}`);
console.log(`Getting files from ${bucket}/${directory}`);
await getFilesList();
console.log(`Found ${filesList.length} files`);
// Create a new download stream for the files
for (const fileKey of filesList) {
processFileObject(fileKey);
};
console.log('Starting upload');
await startUpload();
console.log('Upload done');
console.log('Retrying failed parts');
await retryFailedParts();
console.log('Retry done');
return completeUpload();
}
async function getFilesList(marker) {
// Call S3 to list current buckets
const params = {
Bucket: bucket,
Prefix: directory,
ContinuationToken: marker,
};
// List objects in the S3 bucket
let err, data = await s3.listObjectsV2(params).promise();
totalFiles += data.Contents.length;
if (err) {
// Failed to get list of objects
console.error('Error getting object list', err);
return;
} else {
filesList = filesList.concat(data.Contents.map(file => file.Key));
if (data.IsTruncated)
await getFilesList(data.NextContinuationToken);
}
}
async function processFileObject(fileKey) {
// Wait for the part to be uploaded
const object = await s3.getObject({ Bucket: bucket, Key: fileKey }).promise();
archive.append(object.Body, { name: fileKey });
if (deleteFiles) {
await s3.deleteObject({ Bucket: bucket, Key: fileKey }).promise();
}
}
function createUpload() {
return new Promise((resolve, reject) => {
s3.createMultipartUpload(uploadParams, (err, data) => {
if (err) return reject(err);
return resolve(data);
/*
data = {
Key: "<Your Key for the file>",
Bucket: "<Your Bucket_name>",
UploadId: "ibZBv_75gd9r8lH_gqXatLdxMVpAlj6ZQjEs.Sjng--" //some jebrish!
}
*/
});
})
}
async function startUpload() {
return new Promise(async (resolve, reject) => {
// Make a buffer and fill it with the stream
let buffer = Buffer.from([]);
let partNumber = 1;
streamPassThrough
.on('data', (chunk) => {
// add the chunk to the buffer
buffer = Buffer.concat([buffer, chunk]);
// if the buffer is greater than the minimum upload size
while (buffer.byteLength >= minUploadSize) {
// get the minimum upload size buffer
const minUploadBuffer = buffer.slice(0, minUploadSize);
// update the buffer
buffer = buffer.slice(minUploadSize);
// upload the buffer
uploadPart(minUploadBuffer, partNumber)
.then((part) => {
uploadedFileParts.push(part);
})
.catch((failedPart) => {
failedFileParts.push(failedPart);
});
partNumber++;
}
});
// wait for files to be added
while (archivedFiles < totalFiles) {
await new Promise((resolve) => setTimeout(resolve, 1000));
}
if (buffer.byteLength > 0) {
console.log('All files processed');
uploadPart(buffer, partNumber)
.then((part) => {
uploadedFileParts.push(part);
resolve();
})
.catch((failedPart) => {
failedFileParts.push(failedPart);
resolve();
});
} else {
resolve();
}
});
}
async function retryFailedParts() {
while (failedFileParts.length > 0) {
for (const failedPart of failedFileParts) {
if (failedPart.retry !== true) {
failedPart.retry = true;
uploadPart(failedPart.buffer, failedPart.PartNumber)
.then((part) => {
uploadedFileParts.push(part);
failedFileParts.splice(failedFileParts.indexOf(failedPart), 1);
})
.catch((failedPart) => {
// remove the old part from the list
failedFileParts.splice(failedFileParts.indexOf(failedPart), 1);
// add the new part to the list
failedFileParts.push(failedPart);
});
}
}
}
}
function uploadPart(buffer, partNumber) {
return new Promise((resolve, reject) => {
const partParams = Object.assign({}, uploadParams, {
Body: buffer,
UploadId: upload.UploadId,
PartNumber: partNumber,
});
// Delete ContentType from params
delete partParams.ContentType;
console.log(`Uploading part ${partNumber}`);
s3.uploadPart(partParams).promise()
.then((data) => {
console.log(`Uploaded part ${partNumber}`);
resolve({ PartNumber: partNumber, ETag: data.ETag });
})
.catch((err) => {
console.error(`Error uploading part ${partNumber}`, err);
reject({ PartNumber: partNumber, error: err, buffer: buffer, retry: false });
});
});
}
async function completeUpload() {
// Sort the uploaded parts in ascending order by part number
uploadedFileParts.sort((a, b) => {
return a.PartNumber - b.PartNumber;
});
const params = Object.assign({}, uploadParams, {
UploadId: upload.UploadId,
MultipartUpload: {
Parts: uploadedFileParts,
},
});
// Delete ContentType from params
delete params.ContentType;
return s3.completeMultipartUpload(params).promise();
}
main()
.then(() => {
console.log('Done');
})
.catch(err => {
console.error(err);
});