Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes collectionRevisions Promise interface #65

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 93 additions & 100 deletions src/simperium/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ internal.indexingComplete = function() {
* @returns {Promise<Void>} - resolves once the change version is saved
*/


/**
* Maintains syncing state for a Simperium bucket.
*
Expand Down Expand Up @@ -451,15 +450,16 @@ Channel.prototype.remove = function( id ) {
* @returns {Promise<Array<BucketObjectRevision>>} list of known object versions
*/
Channel.prototype.getRevisions = function( id ) {
return new Promise( ( resolve, reject ) => {
collectionRevisions( this, id, ( error, revisions ) => {
if ( error ) {
reject( error );
return;
}
resolve( revisions );
} );
} );
/**
* Since revision data is basically immutable we can prevent the
* need to refetch it after it has been loaded once.
*
* E.g. key could be `${ entityId }.${ versionNumber }`
*
* @type {Map<String,Object>} stores specific revisions as a cache
*/
const revisionCache = new Map();
return collectionRevisions( this, id, revisionCache );
}

/**
Expand Down Expand Up @@ -855,16 +855,6 @@ LocalQueue.prototype.resendSentChanges = function() {
}
}

/**
* Since revision data is basically immutable we can prevent the
* need to refetch it after it has been loaded once.
*
* E.g. key could be `${ entityId }.${ versionNumber }`
*
* @type {Map<String,Object>} stores specific revisions as a cache
*/
export const revisionCache = new Map();

/**
* Attempts to fetch an entity's revisions
*
Expand All @@ -883,9 +873,10 @@ export const revisionCache = new Map();
*
* @param {Object} channel used to send messages to the Simperium server
* @param {String} id entity id for which to fetch revisions
* @param {Function} callback called on error or when finished
* @param {Map<string,Object>} cache for storing already requeted revisions
* @returns {Promise<Map<string,Object>>} resolves to the fetched revisions
*/
function collectionRevisions( channel, id, callback ) {
function collectionRevisions( channel, id, cache ) {
/** @type {Number} ms delay arbitrarily chosen to give up on fetch */
const TIMEOUT = 200;

Expand All @@ -901,96 +892,98 @@ function collectionRevisions( channel, id, callback ) {
/** @type {Number} handle for "start finishing" timeout */
let timeout;

/**
* Receive a version update from the server and
* dispatch the next fetch or finish the fetching
*
* @param {String} id entity id
* @param {Number} version version of returned entity
* @param {Object} data value of entity at revision
*/
function onVersion( id, version, data ) {
revisionCache.set( `${ id }.${ version }`, data );
versions.push( { id, version, data } );

// if we have every possible revision already, finish it!
// this bypasses any mandatory delay
if ( versions.length === latestVersion ) {
finish();
return;
}

fetchNextVersion( version );
return new Promise( ( resolve, reject ) => {
/**
* Receive a version update from the server and
* dispatch the next fetch or finish the fetching
*
* @param {String} id entity id
* @param {Number} version version of returned entity
* @param {Object} data value of entity at revision
*/
function onVersion( id, version, data ) {
cache.set( `${ id }.${ version }`, data );
versions.push( { id, version, data } );

// if we have every possible revision already, finish it!
// this bypasses any mandatory delay
if ( versions.length === latestVersion ) {
finish();
return;
}

// defer the final response to the application
clearTimeout( timeout );
timeout = setTimeout( finish, TIMEOUT );
}
fetchNextVersion( version );

/**
* Stop listening for versions and stop fetching them
* and pass accumulated data back to application
*/
function finish() {
clearTimeout( timeout );
channel.removeListener( `version.${ id }`, onVersion );
// defer the final response to the application
clearTimeout( timeout );
timeout = setTimeout( finish, TIMEOUT );
}

// sort newest first
callback( null, versions.sort( ( a, b ) => b.version - a.version ) );
}
/**
* Stop listening for versions and stop fetching them
* and pass accumulated data back to application
*/
function finish() {
clearTimeout( timeout );
channel.removeListener( `version.${ id }`, onVersion );

/**
* Find the next version which isn't around and issue
* a fetch if possible
*
* @param {Number} prevVersion starting point for finding next version
*/
function fetchNextVersion( prevVersion ) {
let version = prevVersion;

// find the next version to request
// some could have come back already
// or been requested already
while ( version > 0 && requestedVersions.has( version ) ) {
version -= 1;
// sort newest first
resolve( versions.sort( ( a, b ) => b.version - a.version ) );
}

// we have them all
if ( ! version ) {
return;
}
/**
* Find the next version which isn't around and issue
* a fetch if possible
*
* @param {Number} prevVersion starting point for finding next version
*/
function fetchNextVersion( prevVersion ) {
let version = prevVersion;

// find the next version to request
// some could have come back already
// or been requested already
while ( version > 0 && requestedVersions.has( version ) ) {
version -= 1;
}

requestedVersions.add( version );
// we have them all
if ( ! version ) {
return;
}

requestedVersions.add( version );

// fetch from server or local cache
if ( revisionCache.has( `${ id }.${ version }` ) ) {
onVersion( id, version, revisionCache.get( `${ id }.${ version }` ) );
} else {
channel.send( `e:${ id }.${ version }` );
// fetch from server or local cache
if ( cache.has( `${ id }.${ version }` ) ) {
onVersion( id, version, cache.get( `${ id }.${ version }` ) );
} else {
channel.send( `e:${ id }.${ version }` );
}
}
}

// start listening for the responses
channel.on( `version.${ id }`, onVersion );
// start listening for the responses
channel.on( `version.${ id }`, onVersion );

// request the first revision and start the sequence
// pre-emptively fetch as many as could exist by default
channel.store.get( id ).then( ( { version } ) => {
latestVersion = version;
// request the first revision and start the sequence
// pre-emptively fetch as many as could exist by default
channel.store.get( id ).then( ( { version } ) => {
latestVersion = version;

// grab latest change revisions
for ( let i = 0; i < 60 && ( version - i ) > 0; i++ ) {
fetchNextVersion( version - i );
}
// grab latest change revisions
for ( let i = 0; i < 60 && ( version - i ) > 0; i++ ) {
fetchNextVersion( version - i );
}

// grab archive revisions
// these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …]
const firstArchive = Math.round( ( version - 60 ) / 10 ) * 10 + 1; // 127 -> 67 -> 6 -> 60 -> 61
for ( let i = 0; i < 100 && ( firstArchive - 10 * i ) > 0; i++ ) {
fetchNextVersion( firstArchive - 10 * i );
}
}, callback );
// grab archive revisions
// these are like 1, 11, 21, 31, …, 41, normal revisions [42, 43, 44, 45, …]
const firstArchive = Math.round( ( version - 60 ) / 10 ) * 10 + 1; // 127 -> 67 -> 6 -> 60 -> 61
for ( let i = 0; i < 100 && ( firstArchive - 10 * i ) > 0; i++ ) {
fetchNextVersion( firstArchive - 10 * i );
}
}, reject );

// and set an initial timeout for failed connections
timeout = setTimeout( finish, TIMEOUT * 4 );
// and set an initial timeout for failed connections
timeout = setTimeout( finish, TIMEOUT * 4 );
} );
}