Skip to content

Commit

Permalink
issue #235 - first pass on handshake utility
Browse files Browse the repository at this point in the history
  • Loading branch information
bjouhier committed Sep 20, 2014
1 parent ef83ba2 commit adc434d
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 13 deletions.
33 changes: 26 additions & 7 deletions lib/callbacks/flows.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,30 @@
exports.funnel = builtins.funnel;


exports.handshake = function() {
var callback = null, notified = false;
return {
wait: __rt.streamlinify(function(cb) {
if (callback) { throw new Error("already waiting") };
if (notified) { setImmediate(cb); } else {
callback = cb; };
notified = false;
}, 0),
notify: function() {
if (!callback) { notified = true; } else {
setImmediate(callback); };
callback = null; } }; };


exports.collect = function exports_collect__10(_, futures) { var __frame = { name: "exports_collect__10", line: 171 }; return __func(_, this, arguments, exports_collect__10, 0, __frame, function __$exports_collect__10() { return (function __$exports_collect__10(_) {
var __2 = futures; if (!__2) { return _(null, __2); } ; return futures.map_(__cb(_, __frame, 1, 28, _, true), function __1(_, future) { var __frame = { name: "__1", line: 172 }; return __func(_, this, arguments, __1, 0, __frame, function __$__1() {
return future(__cb(_, __frame, 1, 10, _, true)); }); }); })(__cb(_, __frame, -170, 17, _, true)); }); };






exports.collect = function exports_collect__10(_, futures) { var __frame = { name: "exports_collect__10", line: 190 }; return __func(_, this, arguments, exports_collect__10, 0, __frame, function __$exports_collect__10() { return (function __$exports_collect__10(_) {
var __2 = futures; if (!__2) { return _(null, __2); } ; return futures.map_(__cb(_, __frame, 1, 28, _, true), function __1(_, future) { var __frame = { name: "__1", line: 191 }; return __func(_, this, arguments, __1, 0, __frame, function __$__1() {
return future(__cb(_, __frame, 1, 10, _, true)); }); }); })(__cb(_, __frame, -189, 17, _, true)); }); };



Expand All @@ -193,7 +212,7 @@
cb(); });


exports.nextTick = function exports_nextTick__11(_) { var __frame = { name: "exports_nextTick__11", line: 196 }; return __func(_, this, arguments, exports_nextTick__11, 0, __frame, function __$exports_nextTick__11() {
exports.nextTick = function exports_nextTick__11(_) { var __frame = { name: "exports_nextTick__11", line: 215 }; return __func(_, this, arguments, exports_nextTick__11, 0, __frame, function __$exports_nextTick__11() {
return nextTick(__cb(_, __frame, 1, 2, function __$exports_nextTick__11() { _(); }, true)); }); };


Expand All @@ -219,15 +238,15 @@
}, millis); };


exports.sleep = function exports_sleep__12(_, millis) { var __frame = { name: "exports_sleep__12", line: 222 }; return __func(_, this, arguments, exports_sleep__12, 0, __frame, function __$exports_sleep__12() {
exports.sleep = function exports_sleep__12(_, millis) { var __frame = { name: "exports_sleep__12", line: 241 }; return __func(_, this, arguments, exports_sleep__12, 0, __frame, function __$exports_sleep__12() {
return setTimeout(__cb(_, __frame, 1, 9, _, true), millis); }); };


exports.eventHandler = function(fn) {
return function() {
var that = this;
var args = Array.prototype.slice(arguments, 0);
return (function __1(_) { var __frame = { name: "__1", line: 230 }; return __func(_, this, arguments, __1, 0, __frame, function __$__1() {
return (function __1(_) { var __frame = { name: "__1", line: 249 }; return __func(_, this, arguments, __1, 0, __frame, function __$__1() {
return fn.apply_(__cb(_, __frame, 1, 14, _, true), that, args, 0); });
})(function(err) {
if (err) { throw err }; }); }; };
Expand All @@ -236,7 +255,7 @@



exports.apply = function apply(_, fn, thisObj, args, index) { var __frame = { name: "apply", line: 239 }; return __func(_, this, arguments, apply, 0, __frame, function __$apply() {
exports.apply = function apply(_, fn, thisObj, args, index) { var __frame = { name: "apply", line: 258 }; return __func(_, this, arguments, apply, 0, __frame, function __$apply() {
return fn.apply_(__cb(_, __frame, 1, 12, _, true), thisObj, args, index); }); };


Expand Down
19 changes: 19 additions & 0 deletions lib/compiler/flows._js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@
/// won't be called, and no other operation will enter the funnel.
exports.funnel = builtins.funnel;

// simple handshake - document later.
exports.handshake = function() {
var callback = null, notified = false;
return {
wait: _(function(cb) {
if (callback) throw new Error("already waiting");
if (notified) setImmediate(cb);
else callback = cb;
notified = false;
}, 0),
notify: function() {
if (!callback) notified = true;
else setImmediate(callback);
callback = null;
},
};
}


///
/// * `results = flows.collect(_, futures)`
/// collects the results of an array of futures
Expand Down
19 changes: 19 additions & 0 deletions lib/fibers-fast/flows.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@
/// won't be called, and no other operation will enter the funnel.
exports.funnel = builtins.funnel;

// simple handshake - document later.
exports.handshake = function() {
var callback = null, notified = false;
return {
wait: fstreamline__.star(function(cb) {
if (callback) throw new Error("already waiting");
if (notified) setImmediate(cb);
else callback = cb;
notified = false;
}, 0),
notify: function() {
if (!callback) notified = true;
else setImmediate(callback);
callback = null;
},
};
}


///
/// * `results = flows.collect(_, futures)`
/// collects the results of an array of futures
Expand Down
31 changes: 25 additions & 6 deletions lib/fibers/flows.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,33 @@
/// won't be called, and no other operation will enter the funnel.
exports.funnel = builtins.funnel;

// simple handshake - document later.
exports.handshake = function() {
var callback = null, notified = false;
return {
wait: fstreamline__.streamlinify(function(cb) {
if (callback) throw new Error("already waiting");
if (notified) setImmediate(cb);
else callback = cb;
notified = false;
}, 0),
notify: function() {
if (!callback) notified = true;
else setImmediate(callback);
callback = null;
},
};
};


///
/// * `results = flows.collect(_, futures)`
/// collects the results of an array of futures
exports.collect = fstreamline__.create(function(_, futures) {
return futures && fstreamline__.invoke(futures, "map_", [_, fstreamline__.create(function(_, future) {
return fstreamline__.invoke(null, future, [_], 0);
}, 0,__filename,172)], 0);
}, 0,__filename,171);
}, 0,__filename,191)], 0);
}, 0,__filename,190);

// Obsolete API - use require('streamline/lib/globals').context instead
var globals = require("../globals");
Expand All @@ -195,7 +214,7 @@

exports.nextTick = fstreamline__.create(function(_) {
fstreamline__.invoke(null, nextTick, [_], 0);
}, 0,__filename,196);
}, 0,__filename,215);

exports.setTimeout = function(fn, millis) {
// node's setTimeout notifies immediately if millis > max!!
Expand All @@ -221,15 +240,15 @@

exports.sleep = fstreamline__.create(function(_, millis) {
return fstreamline__.invoke(null, setTimeout, [_, millis], 0);
}, 0,__filename,222);
}, 0,__filename,241);

exports.eventHandler = function(fn) {
return function() {
var that = this;
var args = Array.prototype.slice(arguments, 0);
return fstreamline__.create((function(_) {
return fstreamline__.invoke(fn, "apply_", [_, that, args, 0], 0);
}), 0,__filename,230)( function(err) {
}), 0,__filename,249)( function(err) {
if (err) throw err;
});
};
Expand All @@ -238,7 +257,7 @@
// Obsolete. Use `fn.apply_` instead.
exports.apply = fstreamline__.create(function apply(_, fn, thisObj, args, index) {
return fstreamline__.invoke(fn, "apply_", [_, thisObj, args, index], 0);
}, 0,__filename,239);
}, 0,__filename,258);

exports.callWithTimeout = fstreamline__.streamlinify(function(cb, fn, millis) {
var tid = setTimeout(function() {
Expand Down
19 changes: 19 additions & 0 deletions lib/generators-fast/flows.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@
/// won't be called, and no other operation will enter the funnel.
exports.funnel = builtins.funnel;

// simple handshake - document later.
exports.handshake = function() {
var callback = null, notified = false;
return {
wait: galaxy.star(function(cb) {
if (callback) throw new Error("already waiting");
if (notified) setImmediate(cb);
else callback = cb;
notified = false;
}, 0),
notify: function() {
if (!callback) notified = true;
else setImmediate(callback);
callback = null;
},
};
};


///
/// * `results = flows.collect(_, futures)`
/// collects the results of an array of futures
Expand Down
19 changes: 19 additions & 0 deletions lib/generators/flows.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@
/// won't be called, and no other operation will enter the funnel.
exports.funnel = builtins.funnel;

// simple handshake - document later.
exports.handshake = function() {
var callback = null, notified = false;
return {
wait: galaxy.streamlinify(function(cb) {
if (callback) throw new Error("already waiting");
if (notified) setImmediate(cb);
else callback = cb;
notified = false;
}, 0),
notify: function() {
if (!callback) notified = true;
else setImmediate(callback);
callback = null;
},
};
};


///
/// * `results = flows.collect(_, futures)`
/// collects the results of an array of futures
Expand Down

0 comments on commit adc434d

Please sign in to comment.