From 588a8c53815dead6213674d83dc05d4c35eb915b Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Tue, 29 Aug 2023 13:23:47 +0200 Subject: [PATCH] [new] Add new `runner` ns --- src/taoensso/encore/runner.clj | 128 ++++++++++++++++++++++++++++++++ test/taoensso/encore_tests.cljc | 36 ++++++++- 2 files changed, 163 insertions(+), 1 deletion(-) create mode 100644 src/taoensso/encore/runner.clj diff --git a/src/taoensso/encore/runner.clj b/src/taoensso/encore/runner.clj new file mode 100644 index 00000000..ff7ca448 --- /dev/null +++ b/src/taoensso/encore/runner.clj @@ -0,0 +1,128 @@ +(ns taoensso.encore.runner + "Alpha, subject to change without notice! + Runner util for easily configurable a/sync fn execution. + + Compared to agents: + - Runners have (configurable) back-pressure mechanism. + - Runners may be non-linear when n-threads > 1. + - Runners take nullary fns rather than unary fns of state. + - Runners have no validators or watches. + - Runners auto shutdown their threads on JVM shutdown." + + {:added "vX.Y.Z (YYYY-MM-DD)"} + (:require + [taoensso.encore :as enc :refer [have have?]])) + +(defn runner + "Returns a new stateful (fn runner ([]) ([f])) such that: + (runner f) => Runner should execute given nullary fn according to runner opts. + Returns: + nil if runner has stopped accepting new execution requests. + true if fn was accepted for execution *without* back pressure. + false if runner's back-pressure mechanism was engaged. + + (runner) => Runner should stop accepting new execution requests. + Returns true iff runner's status changed with this call. + + Runners are a little like simplified agents with an explicit and configurable + back-pressure mechanism. + + Options include: + + `mode` - Specifies runner's mode of operation, + ∈ #{:sync :async/blocking :async/dropping :async/sliding}. + + `buffer-size` - Specifies size of async buffer before back-pressure + mechanism is engaged. + + `n-threads` - Specifies number of threads for asynchronously executing fns. + NB execution order may be non-sequential when n > 1." + + ([opts] (runner nil opts)) + ([id ; Handy for memoization wrappers + {:keys [mode buffer-size n-threads thread-name daemon-threads?] :as opts + :or + {mode :async/sliding + buffer-size 1024 + n-threads 1}}] + + (case mode + :sync + (let [stopped?_ (volatile! false)] + (fn sync-runner + ([ ] (when-not @stopped?_ (vreset! stopped?_ true))) + ([f] (if @stopped?_ nil (do (enc/catching (f)) true))))) + + (:async/blocking :async/dropping :async/sliding) + (let [stopped?_ (volatile! false) + abq (java.util.concurrent.ArrayBlockingQueue. + (enc/as-pos-int buffer-size) false) + init_ + (delay + (when-not daemon-threads? + (.addShutdownHook (Runtime/getRuntime) + (Thread. (fn stop-runner [] (vreset! stopped?_ true))))) + + (dotimes [n (enc/as-pos-int n-threads)] + (let [wfn + (fn worker-fn [] + (loop [] + (if-let [f (.poll abq 2000 java.util.concurrent.TimeUnit/MILLISECONDS)] + ;; Recur unconditionally to drain abq even when stopped + (do (enc/catching (f)) (recur)) + (when-not @stopped?_ (recur))))) + + thread-name (enc/str-join-once "-" [(or thread-name `runner) id "loop" (inc n) "of" n-threads]) + thread (Thread. wfn thread-name)] + + (when daemon-threads? + (.setDaemon thread true)) + + ;; (println "Starting thread:" thread-name) + (.start thread)))) + + run-fn + (case mode + :async/blocking (fn [f] (or (.offer abq f) (.put abq f) false)) + :async/dropping (fn [f] (or (.offer abq f) false)) + :async/sliding + (fn [f] + (or + (.offer abq f) ; Common case + (loop [] + (.poll abq) ; Drop + (if (.offer abq f) + false ; Indicate that drop/s occurred + (recur))))))] + + (if-let [msecs (get opts :_debug/init-after)] + (do + (future (Thread/sleep (int msecs)) @init_) + (fn async-runner + ([ ] (when-not @stopped?_ (vreset! stopped?_ true))) + ([f] (if @stopped?_ nil (run-fn f))))) + + (fn async-runner + ([ ] (when-not @stopped?_ (vreset! stopped?_ true))) + ([f] (if @stopped?_ nil (do @init_ (run-fn f))))))) + + (enc/unexpected-arg! mode + {:context `runner + :expected #{:sync :async/blocking :async/dropping :async/sliding}})))) + +(comment + (def get-runner (enc/memoize runner)) + + (enc/qb 1e6 ; [124.99 129.88] + (get-runner :my-runner1 {:mode :sync}) + (get-runner :my-runner1 {:mode :async/blocking})) + + (let [r1 (runner {:mode :sync}) + r2 (runner {:mode :async/blocking})] + (enc/qb 1e6 ; [40.68 158.46] + (r1 (fn [])) + (r2 (fn [])))) + + (enc/qb 1e6 ; [170.1 283.72] + ((get-runner :my-runner1 {:mode :sync}) (fn [])) + ((get-runner :my-runner1 {:mode :async/blocking}) (fn [])))) diff --git a/test/taoensso/encore_tests.cljc b/test/taoensso/encore_tests.cljc index c2e84110..1e30ddf2 100644 --- a/test/taoensso/encore_tests.cljc +++ b/test/taoensso/encore_tests.cljc @@ -6,7 +6,8 @@ ;; [clojure.test.check.properties :as tc-props] [clojure.string :as str] [taoensso.encore :as enc] - [taoensso.encore.ctx-filter :as cf]) + [taoensso.encore.ctx-filter :as cf] + #?(:clj [taoensso.encore.runner :as runner])) #?(:cljs (:require-macros @@ -741,4 +742,37 @@ ;;;; +#?(:clj + (deftest _runner + [(is (= (let [a (atom nil) + r (runner/runner {:mode :sync})] + [(r (fn [] (Thread/sleep 1000) (reset! a :done))) @a]) + [true :done])) + + (is (= (let [a (atom []) + r (runner/runner {:_debug/init-after 100, :mode :async/dropping, :buffer-size 3})] + + [(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n))))) + (do (Thread/sleep 500) @a)]) + + [[true true true false false false] [0 1 2]])) + + (is (= (let [a (atom []) + r (runner/runner {:_debug/init-after 100, :mode :async/sliding, :buffer-size 3})] + + [(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n))))) + (do (Thread/sleep 500) @a)]) + + [[true true true false false false] [3 4 5]])) + + (is (= (let [a (atom []) + r (runner/runner {:_debug/init-after 100, :mode :async/blocking, :buffer-size 3})] + + [(vec (for [n (range 6)] (r (fn [] (Thread/sleep 20) (swap! a conj n))))) + (do (Thread/sleep 500) @a)]) + + [[true true true false false false] [0 1 2 3 4 5]]))])) + +;;;; + #?(:cljs (test/run-tests))