From 1e83e644ded8cd9d4a229ce0d1ac46679f8b0250 Mon Sep 17 00:00:00 2001 From: Andris Reinman Date: Tue, 5 Mar 2024 17:19:16 +0200 Subject: [PATCH] fix(fetch): use batches when fetching message entries for indexing --- lib/connection.js | 21 +++- lib/consts.js | 5 + lib/mailbox.js | 274 ++++++++++++++++++++++++++-------------------- lib/tools.js | 6 +- 4 files changed, 185 insertions(+), 121 deletions(-) diff --git a/lib/connection.js b/lib/connection.js index fda520803..c3a5d6d91 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -652,7 +652,26 @@ class Connection { this.logger.info({ msg: 'Connection closed', type: 'imapClient', account: this.account }); try { - for (let mailbox of this.mailboxes) { + for (let [, mailbox] of this.mailboxes) { + if (mailbox.syncing) { + try { + // set failure flag + await this.redis.hSetNew( + this.getAccountKey(), + 'syncError', + JSON.stringify({ + path: mailbox.path, + time: new Date().toISOString(), + error: { + error: 'Connection closed unexpectedly' + } + }) + ); + } catch (err) { + // ignore + } + } + if (mailbox.selected) { // should be at most one though await mailbox.onClose(); diff --git a/lib/consts.js b/lib/consts.js index e6d63c740..4dc8052bb 100644 --- a/lib/consts.js +++ b/lib/consts.js @@ -164,6 +164,11 @@ module.exports = { MIME_BOUNDARY_PREFIX: '----=_Part', + FETCH_RETRY_INTERVAL: 1000, + FETCH_RETRY_EXPONENTIAL: 2, + FETCH_RETRY_MAX: 60 * 1000, + MAX_FETCH_RANGE: 1000, + generateWebhookTable() { let entries = []; diff --git a/lib/mailbox.js b/lib/mailbox.js index ecfcaf7ae..a6777f2fa 100644 --- a/lib/mailbox.js +++ b/lib/mailbox.js @@ -1,7 +1,16 @@ 'use strict'; const crypto = require('crypto'); -const { serialize, unserialize, compareExisting, normalizePath, download, filterEmptyObjectValues, validUidValidity } = require('./tools'); +const { + serialize, + unserialize, + compareExisting, + normalizePath, + download, + filterEmptyObjectValues, + validUidValidity, + calculateFetchBackoff +} = require('./tools'); const msgpack = require('msgpack5')(); const he = require('he'); const libmime = require('libmime'); @@ -28,17 +37,29 @@ const { EMAIL_COMPLAINT_NOTIFY, REDIS_PREFIX, MAX_INLINE_ATTACHMENT_SIZE, - MAX_ALLOWED_DOWNLOAD_SIZE + MAX_ALLOWED_DOWNLOAD_SIZE, + MAX_FETCH_RANGE } = require('./consts'); // Do not check for flag updates using full sync more often than this value const FULL_SYNC_DELAY = 30 * 60 * 1000; -const FETCH_RETRY_INTERVAL = 1000; -const FETCH_RETRY_EXPONENTIAL = 2; -const FETCH_RETRY_MAX = 60 * 1000; - -const calculateFetchBackoff = attempt => Math.min(FETCH_RETRY_MAX, FETCH_RETRY_INTERVAL * FETCH_RETRY_EXPONENTIAL ** attempt); +function getFetchRange(totalMessages, lastRange) { + let lastEndMarker = lastRange ? lastRange.split(':').pop() : false; + if (lastEndMarker === '*') { + return false; + } + let lastUid = lastRange ? Number(lastEndMarker) : 0; + let startUid = lastUid + 1; + if (startUid > totalMessages) { + return false; + } + let endMarker = startUid + MAX_FETCH_RANGE - 1; + if (endMarker >= totalMessages) { + endMarker = '*'; + } + return `${startUid}:${endMarker}`; +} class Mailbox { constructor(connection, entry) { @@ -63,6 +84,8 @@ class Mailbox { this.runPartialSyncTimer = false; this.synced = false; + + this.syncing = false; } getMailboxStatus(connectionClient) { @@ -445,6 +468,7 @@ class Mailbox { let lock = await this.getMailboxLock(); this.connection.syncing = true; + this.syncing = true; try { if (!this.connection.imapClient) { throw new Error('IMAP connection not available'); @@ -542,11 +566,11 @@ class Mailbox { return; } - let fetchRetryDelay = calculateFetchBackoff(++fetchRetryCount); + const fetchRetryDelay = calculateFetchBackoff(++fetchRetryCount); this.logger.error({ msg: `FETCH failed, retrying in ${Math.round(fetchRetryDelay / 1000)}s` }); await new Promise(r => setTimeout(r, fetchRetryDelay)); - if (!imapClient.usabled) { + if (!imapClient.usable) { // nothing to do here, connection closed this.logger.error({ msg: `FETCH failed, connection already closed, not retrying` }); return; @@ -631,6 +655,7 @@ class Mailbox { } finally { lock.release(); this.connection.syncing = false; + this.syncing = false; } } @@ -1637,6 +1662,7 @@ class Mailbox { let lock = await this.getMailboxLock(); this.connection.syncing = true; + this.syncing = true; try { let mailboxStatus = this.getMailboxStatus(); @@ -1653,133 +1679,144 @@ class Mailbox { }; if (mailboxStatus.messages) { - // only fetch messages if there is some - let fetchCompleted = false; - let fetchRetryCount = 0; - const imapClient = this.connection.imapClient; - while (!fetchCompleted) { - try { - for await (let messageData of imapClient.fetch(range, fields, opts)) { - if (!messageData) { - this.logger.debug({ msg: 'Empty FETCH response', code: 'empty_fetch', query: { range, fields, opts } }); - responseCounters.empty++; - continue; - } + // only fetch messages if there are some + let localRange; + let lastHighestUid = 0; + while ((localRange = getFetchRange(mailboxStatus.messages, localRange))) { + let fetchCompleted = false; + let fetchRetryCount = 0; + const imapClient = this.connection.imapClient; + while (!fetchCompleted) { + try { + for await (let messageData of imapClient.fetch(localRange, fields, opts)) { + if (!messageData) { + this.logger.debug({ msg: 'Empty FETCH response', code: 'empty_fetch', query: { range: localRange, fields, opts } }); + responseCounters.empty++; + continue; + } - if (!messageData.uid || (fields.flags && !messageData.flags)) { - // TODO: support partial responses - // For now, without UID or FLAGS there's nothing to do - this.logger.debug({ - msg: 'Partial FETCH response', - code: 'partial_fetch', - query: { range, fields, opts }, - responseKeys: Object.keys(messageData) - }); - responseCounters.partial++; - continue; - } + if (!messageData.uid || (fields.flags && !messageData.flags)) { + // TODO: support partial responses + // For now, without UID or FLAGS there's nothing to do + this.logger.debug({ + msg: 'Partial FETCH response', + code: 'partial_fetch', + query: { range: localRange, fields, opts }, + responseKeys: Object.keys(messageData) + }); + responseCounters.partial++; + continue; + } - responseCounters.messages++; + if (messageData.uid <= lastHighestUid) { + // already processed in the previous batch + // probably an older email was deleted which shifted message entries + continue; + } + lastHighestUid = messageData.uid; - if (fields.internalDate && !messageData.internalDate) { - this.logger.debug({ - msg: 'Missing INTERNALDATE', - code: 'fetch_date_missing', - query: { range, fields, opts }, - responseKeys: Object.keys(messageData) - }); - } + responseCounters.messages++; - // ignore Recent flag - messageData.flags.delete('\\Recent'); + if (fields.internalDate && !messageData.internalDate) { + this.logger.debug({ + msg: 'Missing INTERNALDATE', + code: 'fetch_date_missing', + query: { range: localRange, fields, opts }, + responseKeys: Object.keys(messageData) + }); + } - if (messageData.seq > seqMax) { - seqMax = messageData.seq; - } + // ignore Recent flag + messageData.flags.delete('\\Recent'); - let storedMessage = await this.entryListGet(messageData.uid, { uid: true }); - if (!storedMessage) { - // new! - let seq = await this.entryListSet(messageData); - if (seq) { - await this.connection.redis.zadd( - this.getNotificationsKey(), - messageData.uid, - JSON.stringify({ - uid: messageData.uid, - flags: messageData.flags, - internalDate: - (messageData.internalDate && - typeof messageData.internalDate.toISOString === 'function' && - messageData.internalDate.toISOString()) || - null - }) - ); - } - } else { - let diff = storedMessage.seq - messageData.seq; - if (diff) { - this.logger.trace({ msg: 'Deleted range', inloop: true, diff, start: messageData.seq }); - } - for (let i = diff - 1; i >= 0; i--) { - let seq = messageData.seq + i; - let deletedEntry = await this.entryListExpunge(seq); - if (deletedEntry) { - await this.processDeleted(deletedEntry); - } + if (messageData.seq > seqMax) { + seqMax = messageData.seq; } - if ((changes = compareExisting(storedMessage.entry, messageData))) { + let storedMessage = await this.entryListGet(messageData.uid, { uid: true }); + if (!storedMessage) { + // new! let seq = await this.entryListSet(messageData); if (seq) { - await this.processChanges(messageData, changes); + await this.connection.redis.zadd( + this.getNotificationsKey(), + messageData.uid, + JSON.stringify({ + uid: messageData.uid, + flags: messageData.flags, + internalDate: + (messageData.internalDate && + typeof messageData.internalDate.toISOString === 'function' && + messageData.internalDate.toISOString()) || + null + }) + ); + } + } else { + let diff = storedMessage.seq - messageData.seq; + if (diff) { + this.logger.trace({ msg: 'Deleted range', inloop: true, diff, start: messageData.seq }); + } + for (let i = diff - 1; i >= 0; i--) { + let seq = messageData.seq + i; + let deletedEntry = await this.entryListExpunge(seq); + if (deletedEntry) { + await this.processDeleted(deletedEntry); + } + } + + if ((changes = compareExisting(storedMessage.entry, messageData))) { + let seq = await this.entryListSet(messageData); + if (seq) { + await this.processChanges(messageData, changes); + } } } } - } - try { - // clear failure flag - await this.connection.redis.hdel(this.connection.getAccountKey(), 'syncError'); + try { + // clear failure flag + await this.connection.redis.hdel(this.connection.getAccountKey(), 'syncError'); + } catch (err) { + // ignore + } + fetchCompleted = true; } catch (err) { - // ignore - } - fetchCompleted = true; - } catch (err) { - if (!imapClient.usable) { - // nothing to do here, connection closed - this.logger.error({ msg: `FETCH failed, connection already closed, not retrying` }); - return; - } + if (!imapClient.usable) { + // nothing to do here, connection closed + this.logger.error({ msg: `FETCH failed, connection already closed, not retrying` }); + return; + } - try { - // set failure flag - await this.connection.redis.hSetExists( - this.connection.getAccountKey(), - 'syncError', - JSON.stringify({ - path: this.path, - time: new Date().toISOString(), - error: { - error: err.message, - responseStatus: err.responseStatus, - responseText: err.responseText - } - }) - ); - } catch (err) { - // ignore - } + try { + // set failure flag + await this.connection.redis.hSetExists( + this.connection.getAccountKey(), + 'syncError', + JSON.stringify({ + path: this.path, + time: new Date().toISOString(), + error: { + error: err.message, + responseStatus: err.responseStatus, + responseText: err.responseText + } + }) + ); + } catch (err) { + // ignore + } - // retry - let fetchRetryDelay = calculateFetchBackoff(++fetchRetryCount); - this.logger.error({ msg: `FETCH failed, retrying in ${Math.round(fetchRetryDelay / 1000)}s` }); - await new Promise(r => setTimeout(r, fetchRetryDelay)); + // retry + const fetchRetryDelay = calculateFetchBackoff(++fetchRetryCount); + this.logger.error({ msg: `FETCH failed, retrying in ${Math.round(fetchRetryDelay / 1000)}s` }); + await new Promise(r => setTimeout(r, fetchRetryDelay)); - if (!imapClient.usable) { - // nothing to do here, connection closed - this.logger.error({ msg: `FETCH failed, connection already closed, not retrying` }); - return; + if (!imapClient.usable) { + // nothing to do here, connection closed + this.logger.error({ msg: `FETCH failed, connection already closed, not retrying` }); + return; + } } } } @@ -1888,6 +1925,7 @@ class Mailbox { } } finally { this.connection.syncing = false; + this.syncing = false; lock.release(); } } diff --git a/lib/tools.js b/lib/tools.js index befc25f0e..5f0675429 100644 --- a/lib/tools.js +++ b/lib/tools.js @@ -29,7 +29,7 @@ const uuid = require('uuid'); const mimeTypes = require('nodemailer/lib/mime-funcs/mime-types'); const { v3: murmurhash } = require('murmurhash'); const { compare: compareVersions, validate: validateVersion } = require('compare-versions'); -const { REDIS_PREFIX, TLS_DEFAULTS, FETCH_TIMEOUT, MAX_FORM_TTL } = require('./consts'); +const { REDIS_PREFIX, TLS_DEFAULTS, FETCH_TIMEOUT, MAX_FORM_TTL, FETCH_RETRY_INTERVAL, FETCH_RETRY_EXPONENTIAL, FETCH_RETRY_MAX } = require('./consts'); const ipaddr = require('ipaddr.js'); const bullmqPackage = require('bullmq/package.json'); const v8 = require('node:v8'); @@ -1793,5 +1793,7 @@ MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEV3QUiYsp13nD9suD1/ZkEXnuMoSg Buffer.from(`ee@${packageData.version}`).toString('base64url'), randomBytes(8).toString('base64url') ].join('_'); - } + }, + + calculateFetchBackoff: attempt => Math.min(FETCH_RETRY_MAX, FETCH_RETRY_INTERVAL * FETCH_RETRY_EXPONENTIAL ** attempt) };