From eae9b0dd94ce09b789d5a24b133413311ba9ee58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EF=BD=81=EF=BD=99=EF=BD=95=EF=BD=8D=EF=BD=89=C2=A0=20?= =?UTF-8?q?=EF=BD=99=EF=BD=95?= Date: Thu, 22 Jun 2017 17:43:28 +0000 Subject: [PATCH] Add requestUtil.bufferedPut to limit concurrency of put Deprecate SEND_SYNC_RECORDS category to be optional and derived from each record. Required for #112 --- client/constants/messages.js | 2 +- client/constants/proto.js | 7 ++++++ client/recordUtil.js | 18 ++++++++++++++++ client/requestUtil.js | 21 ++++++++++++++---- client/sync.js | 7 +++--- lib/promiseHelper.js | 42 ++++++++++++++++++++++++++++++++++++ test/client/recordUtil.js | 14 ++++++++++++ test/client/requestUtil.js | 16 ++++++-------- test/promiseHelper.js | 34 +++++++++++++++++++++++++++++ 9 files changed, 143 insertions(+), 18 deletions(-) create mode 100644 lib/promiseHelper.js create mode 100644 test/promiseHelper.js diff --git a/client/constants/messages.js b/client/constants/messages.js index 0b74517..dcf1fe4 100644 --- a/client/constants/messages.js +++ b/client/constants/messages.js @@ -82,7 +82,7 @@ const messages = { * browser sends this to the webview with the data that needs to be synced * to the sync server. */ - SEND_SYNC_RECORDS: _, /* @param {string} categoryName, @param {Array.} records */ + SEND_SYNC_RECORDS: _, /* @param {string=} categoryName, @param {Array.} records */ /** * browser -> webview * browser sends this to delete the current user. diff --git a/client/constants/proto.js b/client/constants/proto.js index 41d3803..5c01518 100644 --- a/client/constants/proto.js +++ b/client/constants/proto.js @@ -11,3 +11,10 @@ module.exports.actions = { UPDATE: 1, DELETE: 2 } + +module.exports.categoryMap = { + bookmark: 'BOOKMARKS', + historySite: 'HISTORY_SITES', + siteSetting: 'PREFERENCES', + device: 'PREFERENCES' +} diff --git a/client/recordUtil.js b/client/recordUtil.js index 8a85f9a..28fc37c 100644 --- a/client/recordUtil.js +++ b/client/recordUtil.js @@ -5,6 +5,9 @@ const proto = require('./constants/proto') const serializer = require('../lib/serializer') const valueEquals = require('../lib/valueEquals') +// ['0', '1', '2'] +module.exports.CATEGORY_IDS = Object.values(proto.categories) + /** * @param {string} type e.g. 'historySite' * @param {Function} isValidRecord checks if the update record has enough props to make a create record @@ -290,3 +293,18 @@ module.exports.syncRecordAsJS = (record) => { } return object } + +/** + * Derive category ID number from a JS Sync record. + * @param {Object} record e.g. {"action":0, "bookmark": {"isFolder": false,"site": {...}, ...} + * @returns {string} e.g. '0' for bookmark + */ +module.exports.getRecordCategory = (record) => { + for (let type in proto.categoryMap) { + if (record[type]) { + const categoryName = proto.categoryMap[type] + if (!categoryName) { return undefined } + return proto.categories[categoryName] + } + } +} diff --git a/client/requestUtil.js b/client/requestUtil.js index 0ab3ed8..f0c6483 100644 --- a/client/requestUtil.js +++ b/client/requestUtil.js @@ -2,11 +2,14 @@ const awsSdk = require('aws-sdk') const cryptoUtil = require('./cryptoUtil') +const recordUtil = require('./recordUtil') const proto = require('./constants/proto') +const {limitConcurrency} = require('../lib/promiseHelper') const s3Helper = require('../lib/s3Helper') const serializer = require('../lib/serializer') const CONFIG = require('./config') +const PUT_CONCURRENCY = 100 const S3_MAX_RETRIES = 1 const EXPIRED_CREDENTIAL_ERRORS = [ /The provided token has expired\./, @@ -55,6 +58,10 @@ const RequestUtil = function (opts = {}) { this.encrypt = cryptoUtil.Encrypt(this.serializer, opts.keys.secretboxKey, CONFIG.nonceCounter) this.decrypt = cryptoUtil.Decrypt(this.serializer, opts.keys.secretboxKey) this.sign = cryptoUtil.Sign(opts.keys.secretKey) + this.putConcurrency = opts.putConcurrency || PUT_CONCURRENCY + // Like put() but with limited concurrency to avoid out of memory/connection + // errors (net::ERR_INSUFFICIENT_RESOURCES) + this.bufferedPut = limitConcurrency(RequestUtil.prototype.put, this.putConcurrency) if (opts.credentialsBytes) { const credentials = this.parseAWSResponse(opts.credentialsBytes) this.saveAWSCredentials(credentials) @@ -217,12 +224,18 @@ RequestUtil.prototype.currentRecordPrefix = function (category) { /** * Puts a single record, splitting it into multiple objects if needed. - * @param {string} category - the category ID - * @param {Uint8Array} record - the object content, serialized and encrypted + * See also bufferedPut() assigned in the constructor. + * @param {string=} category - the category ID + * @param {object} record - the object content */ RequestUtil.prototype.put = function (category, record) { - const s3Prefix = this.currentRecordPrefix(category) - const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record) + const thisCategory = category || recordUtil.getRecordCategory(record) + if (!recordUtil.CATEGORY_IDS.includes(thisCategory)) { + throw new Error(`Unsupported sync category: ${category}`) + } + const encryptedRecord = this.encrypt(record) + const s3Prefix = this.currentRecordPrefix(thisCategory) + const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord) return this.withRetry(() => { const fetchPromises = s3Keys.map((key, _i) => { const params = { diff --git a/client/sync.js b/client/sync.js index efcdc2d..66b6ca2 100644 --- a/client/sync.js +++ b/client/sync.js @@ -128,9 +128,8 @@ const startSync = (requester) => { ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords) }) ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => { - if (!proto.categories[category]) { - throw new Error(`Unsupported sync category: ${category}`) - } + logSync(`Sending ${records.length} records`) + const categoryId = proto.categories[category] records.forEach((record) => { // Workaround #17 record.deviceId = new Uint8Array(record.deviceId) @@ -139,7 +138,7 @@ const startSync = (requester) => { record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId) } logSync(`sending record: ${JSON.stringify(record)}`) - requester.put(proto.categories[category], requester.encrypt(record)) + requester.bufferedPut(categoryId, record) }) }) ipc.on(messages.DELETE_SYNC_USER, (e) => { diff --git a/lib/promiseHelper.js b/lib/promiseHelper.js new file mode 100644 index 0000000..6379228 --- /dev/null +++ b/lib/promiseHelper.js @@ -0,0 +1,42 @@ +'use strict' + +/** + * Wrap a Promise-returning function so calls to it fill a queue which has + * a concurrency limit. + * e.g. there is an API rate limited to 10 concurrent connections. + * const getApi = (arg) => window.fetch(arg) + * const throttledGetApi = limitConcurrency(getApi, 10) + * for (let i; i < 1000; i++) { throttledGetApi(i) } + * @param fn {function} Function which returns a Promise + * @param concurrency {number} Maximum pending/concurrent fn calls + * @returns {function} + */ +module.exports.limitConcurrency = function (fn, concurrency) { + var queue = null + var active = [] + const enqueueFnFactory = function (_this, args) { + return function () { + const enqueued = fn.apply(_this, args) + enqueued.then(function () { + active.splice(active.indexOf(enqueued), 1) + }) + active.push(enqueued) + return { + enqueued, + newQueue: Promise.race(active) + } + } + } + return function () { + var enqueueFn = enqueueFnFactory(this, arguments) + if (active.length < concurrency) { + const promises = enqueueFn() + queue = promises.newQueue + return promises.enqueued + } else { + const advanceQueue = queue.then(enqueueFn) + queue = advanceQueue.then(promises => promises.newQueue) + return advanceQueue.then(promises => promises.enqueued) + } + } +} diff --git a/test/client/recordUtil.js b/test/client/recordUtil.js index 605b7de..2b2ee2e 100644 --- a/test/client/recordUtil.js +++ b/test/client/recordUtil.js @@ -52,6 +52,7 @@ const updateSiteProps = {customTitle: 'a ball pit filled with plush coconuts'} const recordBookmark = Record({objectData: 'bookmark', bookmark: props.bookmark}) const recordHistorySite = Record({objectData: 'historySite', historySite: siteProps}) const recordSiteSetting = Record({objectData: 'siteSetting', siteSetting: props.siteSetting}) +const recordDevice = Record({objectData: 'device', device: {name: 'test pyramid'}}) const baseRecords = [recordBookmark, recordHistorySite, recordSiteSetting] const updateBookmark = UpdateRecord({ @@ -509,3 +510,16 @@ test('recordUtil.syncRecordAsJS()', (t) => { conversionEquals({ objectData: 'device', device }) }) }) + +test('recordUtil.getRecordCategory()', (t) => { + t.plan(8) + const brokenRecord = Record({}) + t.equals(recordUtil.getRecordCategory(recordBookmark), '0') + t.equals(recordUtil.getRecordCategory(updateBookmark), '0') + t.equals(recordUtil.getRecordCategory(recordHistorySite), '1') + t.equals(recordUtil.getRecordCategory(updateHistorySite), '1') + t.equals(recordUtil.getRecordCategory(recordSiteSetting), '2') + t.equals(recordUtil.getRecordCategory(updateSiteSetting), '2') + t.equals(recordUtil.getRecordCategory(recordDevice), '2') + t.equals(recordUtil.getRecordCategory(brokenRecord), undefined) +}) diff --git a/test/client/requestUtil.js b/test/client/requestUtil.js index 57b676e..fa023af 100644 --- a/test/client/requestUtil.js +++ b/test/client/requestUtil.js @@ -2,7 +2,6 @@ const test = require('tape') const testHelper = require('../testHelper') const timekeeper = require('timekeeper') const clientTestHelper = require('./testHelper') -const cryptoUtil = require('../../client/cryptoUtil') const Serializer = require('../../lib/serializer') const RequestUtil = require('../../client/requestUtil') const proto = require('../../client/constants/proto') @@ -45,7 +44,6 @@ test('client RequestUtil', (t) => { .catch((error) => { console.log(`Cleanup failed: ${error}`) }) }) const serializer = requestUtil.serializer - const encrypt = cryptoUtil.Encrypt(serializer, keys.secretboxKey, 0) t.plan(2) t.test('#put preference: device', (t) => { @@ -60,7 +58,7 @@ test('client RequestUtil', (t) => { device: {name} } timekeeper.freeze(1480000000 * 1000) - requestUtil.put(proto.categories.PREFERENCES, encrypt(record)) + requestUtil.put(proto.categories.PREFERENCES, record) .then((response) => { timekeeper.reset() t.pass(`${t.name} resolves`) @@ -102,7 +100,7 @@ test('client RequestUtil', (t) => { objectId, device: {name} } - const putRequest = requestUtil.put(proto.categories.PREFERENCES, encrypt(record)) + const putRequest = requestUtil.put(proto.categories.PREFERENCES, record) timekeeper.reset() return putRequest } @@ -156,7 +154,7 @@ test('client RequestUtil', (t) => { t.test('#put history site: large URL (multipart)', (t) => { t.plan(2) timekeeper.freeze(1480000000 * 1000) - requestUtil.put(proto.categories.HISTORY_SITES, encrypt(record)) + requestUtil.put(proto.categories.HISTORY_SITES, record) .then((response) => { timekeeper.reset() t.pass(`${t.name} resolves`) @@ -236,8 +234,8 @@ test('client RequestUtil', (t) => { } Promise.all([ - requestUtil.put(proto.categories.PREFERENCES, encrypt(deviceRecord)), - requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord)) + requestUtil.put(proto.categories.PREFERENCES, deviceRecord), + requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord) ]) .then(() => { requestUtil.deleteSiteSettings() @@ -268,7 +266,7 @@ test('client RequestUtil', (t) => { siteSetting: {hostPattern: 'https://google.com', shieldsUp: true} } - requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord)) + requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord) .then(() => { requestUtil.list(proto.categories.PREFERENCES, 0) .then((s3Objects) => { @@ -365,7 +363,7 @@ test('client RequestUtil', (t) => { device: {name: 'sweet'} } const requestUtil = new RequestUtil(expiredArgs) - requestUtil.put(proto.categories.PREFERENCES, encrypt(record)) + requestUtil.put(proto.categories.PREFERENCES, record) .then((response) => { t.pass(t.name) testCredentialRefreshDelete(t) diff --git a/test/promiseHelper.js b/test/promiseHelper.js new file mode 100644 index 0000000..4346287 --- /dev/null +++ b/test/promiseHelper.js @@ -0,0 +1,34 @@ +const test = require('tape') +const promiseHelper = require('../lib/promiseHelper') + +test('promiseHelper', (t) => { + t.plan(1) + + t.test('limitConcurrency', (t) => { + t.plan(1) + + t.test('calls the original function the same number of times with correct args', (t) => { + t.plan(2) + const EXPECTED_CALL_COUNT = 100 + let callCount = 0 + const asyncFun = (i) => new Promise((resolve, reject) => { + setTimeout(() => { + callCount += 1 + resolve(i) + }, Math.round(10 * Math.random())) + }) + const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3) + const promises = [] + let expectedSum = 0 + for (let i = 0; i < EXPECTED_CALL_COUNT; i++) { + promises.push(throttedAsyncFun(i)) + expectedSum += i + } + Promise.all(promises).then((results) => { + const sum = results.reduce((a, b) => a + b) + t.equal(callCount, EXPECTED_CALL_COUNT) + t.equal(sum, expectedSum) + }) + }) + }) +})