Skip to content

Commit

Permalink
fix(throttling): automatically retry throttled FETCH commands a few t…
Browse files Browse the repository at this point in the history
…imes
  • Loading branch information
andris9 committed Oct 26, 2023
1 parent 2534169 commit 07a9aea
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 148 deletions.
287 changes: 152 additions & 135 deletions lib/commands/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,180 +13,197 @@ module.exports = async (connection, range, query, options) => {

let mailbox = connection.mailbox;

let messages = {
count: 0,
list: []
};

const commandKey = connection.capabilities.has('BINARY') && options.binary && !connection.disableBinary ? 'BINARY' : 'BODY';

let response;
try {
let attributes = [{ type: 'SEQUENCE', value: (range || '*').toString() }];
let retryCount = 0;
while (retryCount < 4) {
let messages = {
count: 0,
list: []
};

let queryStructure = [];
let response;
try {
let attributes = [{ type: 'SEQUENCE', value: (range || '*').toString() }];

let setBodyPeek = (attributes, partial) => {
let bodyPeek = {
type: 'ATOM',
value: `${commandKey}.PEEK`,
section: [],
partial
};
let queryStructure = [];

if (Array.isArray(attributes)) {
attributes.forEach(attribute => {
bodyPeek.section.push(attribute);
});
} else if (attributes) {
bodyPeek.section.push(attributes);
}
let setBodyPeek = (attributes, partial) => {
let bodyPeek = {
type: 'ATOM',
value: `${commandKey}.PEEK`,
section: [],
partial
};

if (Array.isArray(attributes)) {
attributes.forEach(attribute => {
bodyPeek.section.push(attribute);
});
} else if (attributes) {
bodyPeek.section.push(attributes);
}

queryStructure.push(bodyPeek);
};
queryStructure.push(bodyPeek);
};

['all', 'fast', 'full', 'uid', 'flags', 'bodyStructure', 'envelope', 'internalDate'].forEach(key => {
if (query[key]) {
queryStructure.push({ type: 'ATOM', value: key.toUpperCase() });
}
});
['all', 'fast', 'full', 'uid', 'flags', 'bodyStructure', 'envelope', 'internalDate'].forEach(key => {
if (query[key]) {
queryStructure.push({ type: 'ATOM', value: key.toUpperCase() });
}
});

if (query.size) {
queryStructure.push({ type: 'ATOM', value: 'RFC822.SIZE' });
}
if (query.size) {
queryStructure.push({ type: 'ATOM', value: 'RFC822.SIZE' });
}

if (query.source) {
let partial;
if (typeof query.source === 'object' && (query.source.start || query.source.maxLength)) {
partial = [Number(query.source.start) || 0];
if (query.source.maxLength && !isNaN(query.source.maxLength)) {
partial.push(Number(query.source.maxLength));
if (query.source) {
let partial;
if (typeof query.source === 'object' && (query.source.start || query.source.maxLength)) {
partial = [Number(query.source.start) || 0];
if (query.source.maxLength && !isNaN(query.source.maxLength)) {
partial.push(Number(query.source.maxLength));
}
}
queryStructure.push({ type: 'ATOM', value: `${commandKey}.PEEK`, section: [], partial });
}
queryStructure.push({ type: 'ATOM', value: `${commandKey}.PEEK`, section: [], partial });
}

// if possible, always request for unique email id
if (connection.capabilities.has('OBJECTID')) {
queryStructure.push({ type: 'ATOM', value: 'EMAILID' });
} else if (connection.capabilities.has('X-GM-EXT-1')) {
queryStructure.push({ type: 'ATOM', value: 'X-GM-MSGID' });
}

if (query.threadId) {
// if possible, always request for unique email id
if (connection.capabilities.has('OBJECTID')) {
queryStructure.push({ type: 'ATOM', value: 'THREADID' });
queryStructure.push({ type: 'ATOM', value: 'EMAILID' });
} else if (connection.capabilities.has('X-GM-EXT-1')) {
queryStructure.push({ type: 'ATOM', value: 'X-GM-THRID' });
queryStructure.push({ type: 'ATOM', value: 'X-GM-MSGID' });
}
}

if (query.labels) {
if (connection.capabilities.has('X-GM-EXT-1')) {
queryStructure.push({ type: 'ATOM', value: 'X-GM-LABELS' });
if (query.threadId) {
if (connection.capabilities.has('OBJECTID')) {
queryStructure.push({ type: 'ATOM', value: 'THREADID' });
} else if (connection.capabilities.has('X-GM-EXT-1')) {
queryStructure.push({ type: 'ATOM', value: 'X-GM-THRID' });
}
}
}

// always ask for modseq if possible
if (connection.enabled.has('CONDSTORE') && !mailbox.noModseq) {
queryStructure.push({ type: 'ATOM', value: 'MODSEQ' });
}
if (query.labels) {
if (connection.capabilities.has('X-GM-EXT-1')) {
queryStructure.push({ type: 'ATOM', value: 'X-GM-LABELS' });
}
}

// always make sure to include UID in the request as well even though server might auto-add it itself
if (!query.uid) {
queryStructure.push({ type: 'ATOM', value: 'UID' });
}
// always ask for modseq if possible
if (connection.enabled.has('CONDSTORE') && !mailbox.noModseq) {
queryStructure.push({ type: 'ATOM', value: 'MODSEQ' });
}

if (query.headers) {
if (Array.isArray(query.headers)) {
setBodyPeek([{ type: 'ATOM', value: 'HEADER.FIELDS' }, query.headers.map(header => ({ type: 'ATOM', value: header }))]);
} else {
setBodyPeek({ type: 'ATOM', value: 'HEADER' });
// always make sure to include UID in the request as well even though server might auto-add it itself
if (!query.uid) {
queryStructure.push({ type: 'ATOM', value: 'UID' });
}
}

if (query.bodyParts && query.bodyParts.length) {
query.bodyParts.forEach(part => {
if (!part) {
return;
if (query.headers) {
if (Array.isArray(query.headers)) {
setBodyPeek([{ type: 'ATOM', value: 'HEADER.FIELDS' }, query.headers.map(header => ({ type: 'ATOM', value: header }))]);
} else {
setBodyPeek({ type: 'ATOM', value: 'HEADER' });
}
let key;
let partial;
if (typeof part === 'object') {
if (!part.key || typeof part.key !== 'string') {
}

if (query.bodyParts && query.bodyParts.length) {
query.bodyParts.forEach(part => {
if (!part) {
return;
}
key = part.key.toUpperCase();
if (part.start || part.maxLength) {
partial = [Number(part.start) || 0];
if (part.maxLength && !isNaN(part.maxLength)) {
partial.push(Number(part.maxLength));
let key;
let partial;
if (typeof part === 'object') {
if (!part.key || typeof part.key !== 'string') {
return;
}
key = part.key.toUpperCase();
if (part.start || part.maxLength) {
partial = [Number(part.start) || 0];
if (part.maxLength && !isNaN(part.maxLength)) {
partial.push(Number(part.maxLength));
}
}
} else if (typeof part === 'string') {
key = part.toUpperCase();
} else {
return;
}
} else if (typeof part === 'string') {
key = part.toUpperCase();
} else {
return;
}

setBodyPeek({ type: 'ATOM', value: key }, partial);
});
}
setBodyPeek({ type: 'ATOM', value: key }, partial);
});
}

if (queryStructure.length === 1) {
queryStructure = queryStructure.pop();
}
if (queryStructure.length === 1) {
queryStructure = queryStructure.pop();
}

attributes.push(queryStructure);
attributes.push(queryStructure);

if (options.changedSince && connection.enabled.has('CONDSTORE') && !mailbox.noModseq) {
let changedSinceArgs = [
{
type: 'ATOM',
value: 'CHANGEDSINCE'
},
{
type: 'ATOM',
value: options.changedSince.toString()
}
];

if (options.changedSince && connection.enabled.has('CONDSTORE') && !mailbox.noModseq) {
let changedSinceArgs = [
{
type: 'ATOM',
value: 'CHANGEDSINCE'
},
{
type: 'ATOM',
value: options.changedSince.toString()
if (options.uid && connection.enabled.has('QRESYNC')) {
changedSinceArgs.push({
type: 'ATOM',
value: 'VANISHED'
});
}
];

if (options.uid && connection.enabled.has('QRESYNC')) {
changedSinceArgs.push({
type: 'ATOM',
value: 'VANISHED'
});
attributes.push(changedSinceArgs);
}

attributes.push(changedSinceArgs);
}

response = await connection.exec(options.uid ? 'UID FETCH' : 'FETCH', attributes, {
untagged: {
FETCH: async untagged => {
messages.count++;
let formatted = await formatMessageResponse(untagged, mailbox);
if (typeof options.onUntaggedFetch === 'function') {
await new Promise((resolve, reject) => {
options.onUntaggedFetch(formatted, err => {
if (err) {
reject(err);
} else {
resolve();
}
response = await connection.exec(options.uid ? 'UID FETCH' : 'FETCH', attributes, {
untagged: {
FETCH: async untagged => {
messages.count++;
let formatted = await formatMessageResponse(untagged, mailbox);
if (typeof options.onUntaggedFetch === 'function') {
await new Promise((resolve, reject) => {
options.onUntaggedFetch(formatted, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
});
} else {
messages.list.push(formatted);
} else {
messages.list.push(formatted);
}
}
}
});

response.next();
return messages;
} catch (err) {
if (err.code === 'ETHROTTLE') {
// retrying
connection.log.warn({
msg: 'Retrying throttled request',
cid: connection.id,
code: err.code,
response: err.responseText,
throttleReset: err.throttleReset,
retryCount
});
retryCount++;
continue;
}
});

response.next();
return messages;
} catch (err) {
connection.log.warn({ err, cid: connection.id });
return false;
connection.log.warn({ err, cid: connection.id });
return false;
}
}
};
2 changes: 2 additions & 0 deletions lib/commands/idle.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ module.exports = async (connection, maxIdleTime) => {
return;
}

maxIdleTime = 3000;

if (connection.capabilities.has('IDLE')) {
let idleTimer;
let stillIdling = false;
Expand Down
35 changes: 22 additions & 13 deletions lib/imap-flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -562,25 +562,31 @@ class ImapFlow extends EventEmitter {
if (txt) {
err.responseText = txt;

let throttleDelay = false;

// MS365 throttling
// tag BAD Request is throttled. Suggested Backoff Time: 92415 milliseconds
if (/Request is throttled/i.test(txt) && /Backoff Time/i.test(txt)) {
let throttlingMatch = txt.match(/Backoff Time[:=\s]+(\d+)/i);
if (throttlingMatch && throttlingMatch[1] && !isNaN(throttlingMatch[1])) {
err.code = 'ETHROTTLE';
err.throttleReset = Number(throttlingMatch[1]);

let delayResponse = err.throttleReset;
if (delayResponse > 5 * 60 * 1000) {
// max delay cap
delayResponse = 5 * 60 * 1000;
}
if (delayResponse) {
this.log.warn({ msg: 'Throttling detected', err, cid: this.id, delayResponse });
await new Promise(r => setTimeout(r, delayResponse));
}
throttleDelay = Number(throttlingMatch[1]);
}
}

// Wait and return a throttling error
if (throttleDelay) {
err.code = 'ETHROTTLE';
err.throttleReset = throttleDelay;

let delayResponse = throttleDelay;
if (delayResponse > 5 * 60 * 1000) {
// max delay cap
delayResponse = 5 * 60 * 1000;
}

this.log.warn({ msg: 'Throttling detected', err, cid: this.id, throttleDelay, delayResponse });
await new Promise(r => setTimeout(r, delayResponse));
}
}

request.reject(err);
Expand Down Expand Up @@ -2736,7 +2742,10 @@ class ImapFlow extends EventEmitter {

let result = await handler(this, ...args);

this.autoidle();
if (command !== 'IDLE') {
// do not autostart IDLE, if IDLE itself was stopped
this.autoidle();
}

return result;
}
Expand Down

0 comments on commit 07a9aea

Please sign in to comment.