-
-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
8329901
commit d231cdc
Showing
2 changed files
with
163 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
(ns taoensso.encore.runner | ||
"Alpha, subject to change without notice! | ||
Runner util for easily configurable a/sync fn execution. | ||
Compared to agents: | ||
- Runners have (configurable) back-pressure mechanism. | ||
- Runners may be non-linear when n-threads > 1. | ||
- Runners take nullary fns rather than unary fns of state. | ||
- Runners have no validators or watches. | ||
- Runners auto shutdown their threads on JVM shutdown." | ||
|
||
{:added "vX.Y.Z (YYYY-MM-DD)"} | ||
(:require | ||
[taoensso.encore :as enc :refer [have have?]])) | ||
|
||
(defn runner | ||
"Returns a new stateful (fn runner ([]) ([f])) such that: | ||
(runner f) => Runner should execute given nullary fn according to runner opts. | ||
Returns: | ||
nil if runner has stopped accepting new execution requests. | ||
true if fn was accepted for execution *without* back pressure. | ||
false if runner's back-pressure mechanism was engaged. | ||
(runner) => Runner should stop accepting new execution requests. | ||
Returns true iff runner's status changed with this call. | ||
Runners are a little like simplified agents with an explicit and configurable | ||
back-pressure mechanism. | ||
Options include: | ||
`mode` - Specifies runner's mode of operation, | ||
∈ #{:sync :async/blocking :async/dropping :async/sliding}. | ||
`buffer-size` - Specifies size of async buffer before back-pressure | ||
mechanism is engaged. | ||
`n-threads` - Specifies number of threads for asynchronously executing fns. | ||
NB execution order may be non-sequential when n > 1." | ||
|
||
([opts] (runner nil opts)) | ||
([id ; Handy for memoization wrappers | ||
{:keys [mode buffer-size n-threads thread-name daemon-threads?] :as opts | ||
:or | ||
{mode :async/sliding | ||
buffer-size 1024 | ||
n-threads 1}}] | ||
|
||
(case mode | ||
:sync | ||
(let [stopped?_ (volatile! false)] | ||
(fn sync-runner | ||
([ ] (when-not @stopped?_ (vreset! stopped?_ true))) | ||
([f] (if @stopped?_ nil (do (enc/catching (f)) true))))) | ||
|
||
(:async/blocking :async/dropping :async/sliding) | ||
(let [stopped?_ (volatile! false) | ||
abq (java.util.concurrent.ArrayBlockingQueue. | ||
(enc/as-pos-int buffer-size) false) | ||
init_ | ||
(delay | ||
(when-not daemon-threads? | ||
(.addShutdownHook (Runtime/getRuntime) | ||
(Thread. (fn stop-runner [] (vreset! stopped?_ true))))) | ||
|
||
(dotimes [n (enc/as-pos-int n-threads)] | ||
(let [wfn | ||
(fn worker-fn [] | ||
(loop [] | ||
(if-let [f (.poll abq 2000 java.util.concurrent.TimeUnit/MILLISECONDS)] | ||
;; Recur unconditionally to drain abq even when stopped | ||
(do (enc/catching (f)) (recur)) | ||
(when-not @stopped?_ (recur))))) | ||
|
||
thread-name (enc/str-join-once "-" [(or thread-name `runner) id "loop" (inc n) "of" n-threads]) | ||
thread (Thread. wfn thread-name)] | ||
|
||
(when daemon-threads? | ||
(.setDaemon thread true)) | ||
|
||
;; (println "Starting thread:" thread-name) | ||
(.start thread)))) | ||
|
||
run-fn | ||
(case mode | ||
:async/blocking (fn [f] (or (.offer abq f) (.put abq f) false)) | ||
:async/dropping (fn [f] (or (.offer abq f) false)) | ||
:async/sliding | ||
(fn [f] | ||
(or | ||
(.offer abq f) ; Common case | ||
(loop [] | ||
(.poll abq) ; Drop | ||
(if (.offer abq f) | ||
false ; Indicate that drop/s occurred | ||
(recur))))))] | ||
|
||
(if-let [msecs (get opts :_debug/init-after)] | ||
(do | ||
(future (Thread/sleep (int msecs)) @init_) | ||
(fn async-runner | ||
([ ] (when-not @stopped?_ (vreset! stopped?_ true))) | ||
([f] (if @stopped?_ nil (run-fn f))))) | ||
|
||
(fn async-runner | ||
([ ] (when-not @stopped?_ (vreset! stopped?_ true))) | ||
([f] (if @stopped?_ nil (do @init_ (run-fn f))))))) | ||
|
||
(enc/unexpected-arg! mode | ||
{:context `runner | ||
:expected #{:sync :async/blocking :async/dropping :async/sliding}})))) | ||
|
||
(comment | ||
(def get-runner (enc/memoize runner)) | ||
|
||
(enc/qb 1e6 ; [124.99 129.88] | ||
(get-runner :my-runner1 {:mode :sync}) | ||
(get-runner :my-runner1 {:mode :async/blocking})) | ||
|
||
(let [r1 (runner {:mode :sync}) | ||
r2 (runner {:mode :async/blocking})] | ||
(enc/qb 1e6 ; [40.68 158.46] | ||
(r1 (fn [])) | ||
(r2 (fn [])))) | ||
|
||
(enc/qb 1e6 ; [170.1 283.72] | ||
((get-runner :my-runner1 {:mode :sync}) (fn [])) | ||
((get-runner :my-runner1 {:mode :async/blocking}) (fn [])))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters