Skip to content

Commit

Permalink
[new] Add new runner ns
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Aug 30, 2023
1 parent cbadfda commit 25fa687
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 1 deletion.
121 changes: 121 additions & 0 deletions src/taoensso/encore/runner.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
(ns taoensso.encore.runner
"Alpha, subject to change without notice!
Runner util for easily configurable a/sync fn execution.
Compared to agents:
- Runners have (configurable) back-pressure mechanism.
- Runners may be non-linear when n-threads > 1.
- Runners take nullary fns rather than unary fns of state.
- Runners have no validators or watches.
- Runners auto shutdown their threads on JVM shutdown."

{:added "vX.Y.Z (YYYY-MM-DD)"}
(:require
[taoensso.encore :as enc :refer [have have?]]))

(defn runner
"Returns a new stateful (fn runner ([]) ([f])) such that:
(runner f) => Runner should execute given nullary fn according to runner opts.
Returns:
nil if runner has stopped accepting new execution requests.
true if fn was accepted for execution *without* back pressure.
false if runner's back-pressure mechanism was engaged.
(runner) => Runner should stop accepting new execution requests.
Returns true iff runner's status changed with this call.
Runners are a little like simplified agents with an explicit and configurable
back-pressure mechanism.
Options include:
`mode` - Specifies runner's mode of operation,
∈ #{:sync :async/blocking :async/dropping :async/sliding}.
`buffer-size` - Specifies size of async buffer before back-pressure
mechanism is engaged.
`n-threads` - Specifies number of threads for asynchronously executing fns.
NB execution order may be non-sequential when n > 1."

([opts] (runner nil opts))
([id ; Handy for memoization wrappers
{:keys [mode buffer-size n-threads thread-name daemon-threads?] :as opts
:or
{mode :async/sliding
buffer-size 1024
n-threads 1}}]

(case mode
:sync
(let [stopped?_ (atom false)]
(fn sync-runner
([ ] (compare-and-set! stopped?_ false true))
([f] (if @stopped?_ nil (do (enc/catching (f)) true)))))

(:async/blocking :async/dropping :async/sliding)
(let [stopped?_ (atom false)
abq (java.util.concurrent.ArrayBlockingQueue.
(enc/as-pos-int buffer-size) false)
init_
(delay
(when-not daemon-threads?
(.addShutdownHook (Runtime/getRuntime)
(Thread. (fn stop-runner [] (reset! stopped?_ true)))))

(dotimes [n (enc/as-pos-int n-threads)]
(let [wfn
(fn worker-fn []
(loop []
(if-let [f (.poll abq 2000 java.util.concurrent.TimeUnit/MILLISECONDS)]
;; Recur unconditionally to drain abq even when stopped
(do (enc/catching (f)) (recur))
(when-not @stopped?_ (recur)))))

thread-name (enc/str-join-once "-" [(or thread-name `runner) id "loop" (inc n) "of" n-threads])
thread (Thread. wfn thread-name)]

(when daemon-threads?
(.setDaemon thread true))

;; (println "Starting thread:" thread-name)
(.start thread))))

run-fn
(case mode
:async/blocking (fn [f] (or (.offer abq f) (.put abq f) false))
:async/dropping (fn [f] (or (.offer abq f) false))
:async/sliding
(fn [f]
(or
(.offer abq f) ; Common case
(loop []
(.poll abq) ; Drop
(if (.offer abq f)
false ; To indicate drop/s occurred
(recur))))))]

(fn async-runner
([ ] (compare-and-set! stopped?_ false true))
([f] (if @stopped?_ nil (do @init_ (run-fn f))))))

(enc/unexpected-arg! mode
{:context `runner
:expected #{:sync :async/blocking :async/dropping :async/sliding}}))))

(comment
(def get-runner (enc/memoize runner))

(enc/qb 1e6 ; [124.99 129.88]
(get-runner :my-runner1 {:mode :sync})
(get-runner :my-runner1 {:mode :async/blocking}))

(let [r1 (runner {:mode :sync})
r2 (runner {:mode :async/blocking})]
(enc/qb 1e6 ; [51.83 167.79]
(r1 (fn []))
(r2 (fn []))))

(enc/qb 1e6 ; [170.1 283.72]
((get-runner :my-runner1 {:mode :sync}) (fn []))
((get-runner :my-runner1 {:mode :async/blocking}) (fn []))))
38 changes: 37 additions & 1 deletion test/taoensso/encore_tests.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
;; [clojure.test.check.properties :as tc-props]
[clojure.string :as str]
[taoensso.encore :as enc]
[taoensso.encore.ctx-filter :as cf])
[taoensso.encore.ctx-filter :as cf]
#?(:clj [taoensso.encore.runner :as runner]))

#?(:cljs
(:require-macros
Expand Down Expand Up @@ -712,4 +713,39 @@

;;;;

#?(:clj
(deftest _runner
[(is (= (let [a (atom nil)
r (runner/runner {:mode :sync})]

[(r (fn [] (Thread/sleep 1000) (reset! a :done))) @a])

[true :done]))

(is (= (let [a (atom [])
r (runner/runner {:mode :async/dropping, :buffer-size 3})]

[(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n)))))
(do (Thread/sleep 200) @a)])

[[true true true false false false] [0 1 2]]))

(is (= (let [a (atom [])
r (runner/runner {:mode :async/sliding, :buffer-size 3})]

[(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n)))))
(do (Thread/sleep 200) @a)])

[[true true true false false false] [3 4 5]]))

(is (= (let [a (atom [])
r (runner/runner {:mode :async/blocking, :buffer-size 3})]

[(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n)))))
(do (Thread/sleep 200) @a)])

[[true true true false false false] [0 1 2 3 4 5]]))]))

;;;;

#?(:cljs (test/run-tests))

0 comments on commit 25fa687

Please sign in to comment.