Skip to content
This repository has been archived by the owner on Jul 31, 2020. It is now read-only.

Commit

Permalink
Add requestUtil.bufferedPut to limit concurrency of put
Browse files Browse the repository at this point in the history
Deprecate SEND_SYNC_RECORDS category to be optional and derived
from each record.

Required for #112
  • Loading branch information
ayumi committed Jun 29, 2017
1 parent 59eccd7 commit eae9b0d
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 18 deletions.
2 changes: 1 addition & 1 deletion client/constants/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const messages = {
* browser sends this to the webview with the data that needs to be synced
* to the sync server.
*/
SEND_SYNC_RECORDS: _, /* @param {string} categoryName, @param {Array.<Object>} records */
SEND_SYNC_RECORDS: _, /* @param {string=} categoryName, @param {Array.<Object>} records */
/**
* browser -> webview
* browser sends this to delete the current user.
Expand Down
7 changes: 7 additions & 0 deletions client/constants/proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ module.exports.actions = {
UPDATE: 1,
DELETE: 2
}

module.exports.categoryMap = {
bookmark: 'BOOKMARKS',
historySite: 'HISTORY_SITES',
siteSetting: 'PREFERENCES',
device: 'PREFERENCES'
}
18 changes: 18 additions & 0 deletions client/recordUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ const proto = require('./constants/proto')
const serializer = require('../lib/serializer')
const valueEquals = require('../lib/valueEquals')

// ['0', '1', '2']
module.exports.CATEGORY_IDS = Object.values(proto.categories)

/**
* @param {string} type e.g. 'historySite'
* @param {Function} isValidRecord checks if the update record has enough props to make a create record
Expand Down Expand Up @@ -290,3 +293,18 @@ module.exports.syncRecordAsJS = (record) => {
}
return object
}

/**
* Derive category ID number from a JS Sync record.
* @param {Object} record e.g. {"action":0, "bookmark": {"isFolder": false,"site": {...}, ...}
* @returns {string} e.g. '0' for bookmark
*/
module.exports.getRecordCategory = (record) => {
for (let type in proto.categoryMap) {
if (record[type]) {
const categoryName = proto.categoryMap[type]
if (!categoryName) { return undefined }
return proto.categories[categoryName]
}
}
}
21 changes: 17 additions & 4 deletions client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

const awsSdk = require('aws-sdk')
const cryptoUtil = require('./cryptoUtil')
const recordUtil = require('./recordUtil')
const proto = require('./constants/proto')
const {limitConcurrency} = require('../lib/promiseHelper')
const s3Helper = require('../lib/s3Helper')
const serializer = require('../lib/serializer')

const CONFIG = require('./config')
const PUT_CONCURRENCY = 100
const S3_MAX_RETRIES = 1
const EXPIRED_CREDENTIAL_ERRORS = [
/The provided token has expired\./,
Expand Down Expand Up @@ -55,6 +58,10 @@ const RequestUtil = function (opts = {}) {
this.encrypt = cryptoUtil.Encrypt(this.serializer, opts.keys.secretboxKey, CONFIG.nonceCounter)
this.decrypt = cryptoUtil.Decrypt(this.serializer, opts.keys.secretboxKey)
this.sign = cryptoUtil.Sign(opts.keys.secretKey)
this.putConcurrency = opts.putConcurrency || PUT_CONCURRENCY
// Like put() but with limited concurrency to avoid out of memory/connection
// errors (net::ERR_INSUFFICIENT_RESOURCES)
this.bufferedPut = limitConcurrency(RequestUtil.prototype.put, this.putConcurrency)
if (opts.credentialsBytes) {
const credentials = this.parseAWSResponse(opts.credentialsBytes)
this.saveAWSCredentials(credentials)
Expand Down Expand Up @@ -217,12 +224,18 @@ RequestUtil.prototype.currentRecordPrefix = function (category) {

/**
* Puts a single record, splitting it into multiple objects if needed.
* @param {string} category - the category ID
* @param {Uint8Array} record - the object content, serialized and encrypted
* See also bufferedPut() assigned in the constructor.
* @param {string=} category - the category ID
* @param {object} record - the object content
*/
RequestUtil.prototype.put = function (category, record) {
const s3Prefix = this.currentRecordPrefix(category)
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, record)
const thisCategory = category || recordUtil.getRecordCategory(record)
if (!recordUtil.CATEGORY_IDS.includes(thisCategory)) {
throw new Error(`Unsupported sync category: ${category}`)
}
const encryptedRecord = this.encrypt(record)
const s3Prefix = this.currentRecordPrefix(thisCategory)
const s3Keys = s3Helper.encodeDataToS3KeyArray(s3Prefix, encryptedRecord)
return this.withRetry(() => {
const fetchPromises = s3Keys.map((key, _i) => {
const params = {
Expand Down
7 changes: 3 additions & 4 deletions client/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ const startSync = (requester) => {
ipc.send(messages.RESOLVED_SYNC_RECORDS, category, resolvedRecords)
})
ipc.on(messages.SEND_SYNC_RECORDS, (e, category, records) => {
if (!proto.categories[category]) {
throw new Error(`Unsupported sync category: ${category}`)
}
logSync(`Sending ${records.length} records`)
const categoryId = proto.categories[category]
records.forEach((record) => {
// Workaround #17
record.deviceId = new Uint8Array(record.deviceId)
Expand All @@ -139,7 +138,7 @@ const startSync = (requester) => {
record.bookmark.parentFolderObjectId = new Uint8Array(record.bookmark.parentFolderObjectId)
}
logSync(`sending record: ${JSON.stringify(record)}`)
requester.put(proto.categories[category], requester.encrypt(record))
requester.bufferedPut(categoryId, record)
})
})
ipc.on(messages.DELETE_SYNC_USER, (e) => {
Expand Down
42 changes: 42 additions & 0 deletions lib/promiseHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

/**
* Wrap a Promise-returning function so calls to it fill a queue which has
* a concurrency limit.
* e.g. there is an API rate limited to 10 concurrent connections.
* const getApi = (arg) => window.fetch(arg)
* const throttledGetApi = limitConcurrency(getApi, 10)
* for (let i; i < 1000; i++) { throttledGetApi(i) }
* @param fn {function} Function which returns a Promise
* @param concurrency {number} Maximum pending/concurrent fn calls
* @returns {function}
*/
module.exports.limitConcurrency = function (fn, concurrency) {
var queue = null
var active = []
const enqueueFnFactory = function (_this, args) {
return function () {
const enqueued = fn.apply(_this, args)
enqueued.then(function () {
active.splice(active.indexOf(enqueued), 1)
})
active.push(enqueued)
return {
enqueued,
newQueue: Promise.race(active)
}
}
}
return function () {
var enqueueFn = enqueueFnFactory(this, arguments)
if (active.length < concurrency) {
const promises = enqueueFn()
queue = promises.newQueue
return promises.enqueued
} else {
const advanceQueue = queue.then(enqueueFn)
queue = advanceQueue.then(promises => promises.newQueue)
return advanceQueue.then(promises => promises.enqueued)
}
}
}
14 changes: 14 additions & 0 deletions test/client/recordUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const updateSiteProps = {customTitle: 'a ball pit filled with plush coconuts'}
const recordBookmark = Record({objectData: 'bookmark', bookmark: props.bookmark})
const recordHistorySite = Record({objectData: 'historySite', historySite: siteProps})
const recordSiteSetting = Record({objectData: 'siteSetting', siteSetting: props.siteSetting})
const recordDevice = Record({objectData: 'device', device: {name: 'test pyramid'}})
const baseRecords = [recordBookmark, recordHistorySite, recordSiteSetting]

const updateBookmark = UpdateRecord({
Expand Down Expand Up @@ -509,3 +510,16 @@ test('recordUtil.syncRecordAsJS()', (t) => {
conversionEquals({ objectData: 'device', device })
})
})

test('recordUtil.getRecordCategory()', (t) => {
t.plan(8)
const brokenRecord = Record({})
t.equals(recordUtil.getRecordCategory(recordBookmark), '0')
t.equals(recordUtil.getRecordCategory(updateBookmark), '0')
t.equals(recordUtil.getRecordCategory(recordHistorySite), '1')
t.equals(recordUtil.getRecordCategory(updateHistorySite), '1')
t.equals(recordUtil.getRecordCategory(recordSiteSetting), '2')
t.equals(recordUtil.getRecordCategory(updateSiteSetting), '2')
t.equals(recordUtil.getRecordCategory(recordDevice), '2')
t.equals(recordUtil.getRecordCategory(brokenRecord), undefined)
})
16 changes: 7 additions & 9 deletions test/client/requestUtil.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const test = require('tape')
const testHelper = require('../testHelper')
const timekeeper = require('timekeeper')
const clientTestHelper = require('./testHelper')
const cryptoUtil = require('../../client/cryptoUtil')
const Serializer = require('../../lib/serializer')
const RequestUtil = require('../../client/requestUtil')
const proto = require('../../client/constants/proto')
Expand Down Expand Up @@ -45,7 +44,6 @@ test('client RequestUtil', (t) => {
.catch((error) => { console.log(`Cleanup failed: ${error}`) })
})
const serializer = requestUtil.serializer
const encrypt = cryptoUtil.Encrypt(serializer, keys.secretboxKey, 0)

t.plan(2)
t.test('#put preference: device', (t) => {
Expand All @@ -60,7 +58,7 @@ test('client RequestUtil', (t) => {
device: {name}
}
timekeeper.freeze(1480000000 * 1000)
requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
requestUtil.put(proto.categories.PREFERENCES, record)
.then((response) => {
timekeeper.reset()
t.pass(`${t.name} resolves`)
Expand Down Expand Up @@ -102,7 +100,7 @@ test('client RequestUtil', (t) => {
objectId,
device: {name}
}
const putRequest = requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
const putRequest = requestUtil.put(proto.categories.PREFERENCES, record)
timekeeper.reset()
return putRequest
}
Expand Down Expand Up @@ -156,7 +154,7 @@ test('client RequestUtil', (t) => {
t.test('#put history site: large URL (multipart)', (t) => {
t.plan(2)
timekeeper.freeze(1480000000 * 1000)
requestUtil.put(proto.categories.HISTORY_SITES, encrypt(record))
requestUtil.put(proto.categories.HISTORY_SITES, record)
.then((response) => {
timekeeper.reset()
t.pass(`${t.name} resolves`)
Expand Down Expand Up @@ -236,8 +234,8 @@ test('client RequestUtil', (t) => {
}

Promise.all([
requestUtil.put(proto.categories.PREFERENCES, encrypt(deviceRecord)),
requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord))
requestUtil.put(proto.categories.PREFERENCES, deviceRecord),
requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord)
])
.then(() => {
requestUtil.deleteSiteSettings()
Expand Down Expand Up @@ -268,7 +266,7 @@ test('client RequestUtil', (t) => {
siteSetting: {hostPattern: 'https://google.com', shieldsUp: true}
}

requestUtil.put(proto.categories.PREFERENCES, encrypt(siteSettingRecord))
requestUtil.put(proto.categories.PREFERENCES, siteSettingRecord)
.then(() => {
requestUtil.list(proto.categories.PREFERENCES, 0)
.then((s3Objects) => {
Expand Down Expand Up @@ -365,7 +363,7 @@ test('client RequestUtil', (t) => {
device: {name: 'sweet'}
}
const requestUtil = new RequestUtil(expiredArgs)
requestUtil.put(proto.categories.PREFERENCES, encrypt(record))
requestUtil.put(proto.categories.PREFERENCES, record)
.then((response) => {
t.pass(t.name)
testCredentialRefreshDelete(t)
Expand Down
34 changes: 34 additions & 0 deletions test/promiseHelper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const test = require('tape')
const promiseHelper = require('../lib/promiseHelper')

test('promiseHelper', (t) => {
t.plan(1)

t.test('limitConcurrency', (t) => {
t.plan(1)

t.test('calls the original function the same number of times with correct args', (t) => {
t.plan(2)
const EXPECTED_CALL_COUNT = 100
let callCount = 0
const asyncFun = (i) => new Promise((resolve, reject) => {
setTimeout(() => {
callCount += 1
resolve(i)
}, Math.round(10 * Math.random()))
})
const throttedAsyncFun = promiseHelper.limitConcurrency(asyncFun, 3)
const promises = []
let expectedSum = 0
for (let i = 0; i < EXPECTED_CALL_COUNT; i++) {
promises.push(throttedAsyncFun(i))
expectedSum += i
}
Promise.all(promises).then((results) => {
const sum = results.reduce((a, b) => a + b)
t.equal(callCount, EXPECTED_CALL_COUNT)
t.equal(sum, expectedSum)
})
})
})
})

0 comments on commit eae9b0d

Please sign in to comment.