Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

비동기 pooled executor로 변경 #11

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions src/superlifter/core.cljc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
(ns superlifter.core
(:require [urania.core :as u]
[promesa.core :as prom]
[superlifter.future :refer [future-with-pooled-executor]]
#?(:clj [superlifter.logging :refer [log]]
:cljs [superlifter.logging :refer-macros [log]]))
(:refer-clojure :exclude [resolve]))
Expand Down Expand Up @@ -123,10 +124,10 @@

(defmethod start-trigger! :interval [_ bucket-id opts]
(let [start-fn #?(:clj (fn [context]
(let [watcher (future (loop []
(Thread/sleep (:interval opts))
(fetch-all-handling-errors! context bucket-id)
(recur)))]
(let [watcher (future-with-pooled-executor (loop []
(Thread/sleep (:interval opts))
(fetch-all-handling-errors! context bucket-id)
(recur)))]
;; return a function to stop the watcher
#(future-cancel watcher)))
:cljs (fn [context]
Expand Down Expand Up @@ -157,22 +158,22 @@
(let [interval (:interval opts)
last-updated (atom nil)
start-fn #?(:clj (fn [context]
(let [watcher (future (loop []
(let [lu @last-updated]
(cond
(nil? lu) (do (Thread/sleep interval)
(recur))

(= :exit lu) nil

(<= interval (- (System/currentTimeMillis) lu))
(do (fetch-all-handling-errors! context bucket-id)
(compare-and-set! last-updated lu nil)
(recur))

:else
(do (Thread/sleep (- interval (- (System/currentTimeMillis) lu)))
(recur))))))]
(let [watcher (future-with-pooled-executor (loop []
(let [lu @last-updated]
(cond
(nil? lu) (do (Thread/sleep interval)
(recur))

(= :exit lu) nil

(<= interval (- (System/currentTimeMillis) lu))
(do (fetch-all-handling-errors! context bucket-id)
(compare-and-set! last-updated lu nil)
(recur))

:else
(do (Thread/sleep (- interval (- (System/currentTimeMillis) lu)))
(recur))))))]

;; return a function to stop the watcher
(fn []
Expand Down
74 changes: 74 additions & 0 deletions src/superlifter/future.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
(ns superlifter.future
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

core에 같이 넣기는 애매한것같아서 future라는 ns를 만들고 core 함수를 가져와서 executor를 주입할 수 있도록 일부 수정했습니다.
private으로 설정된 녀석들도 있어서 불가피하게 같이 가져왔어요.

(:import
java.util.concurrent.Callable))

(def default-executor
clojure.lang.Agent/pooledExecutor)

(defn binding-conveyor-fn
{:private true
:added "1.3"}
[f]
(let [frame (clojure.lang.Var/cloneThreadBindingFrame)]
(fn
([]
(clojure.lang.Var/resetThreadBindingFrame frame)
(f))
([x]
(clojure.lang.Var/resetThreadBindingFrame frame)
(f x))
([x y]
(clojure.lang.Var/resetThreadBindingFrame frame)
(f x y))
([x y z]
(clojure.lang.Var/resetThreadBindingFrame frame)
(f x y z))
([x y z & args]
(clojure.lang.Var/resetThreadBindingFrame frame)
(apply f x y z args)))))

(defn ^:private deref-future
([^java.util.concurrent.Future fut]
(.get fut))
([^java.util.concurrent.Future fut timeout-ms timeout-val]
(try (.get fut timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS)
(catch java.util.concurrent.TimeoutException e
timeout-val))))

(defn future-call-with-executor
"Takes a function of no args (and optionally an ExecutorService
instance that will run the task, by default it uses pooledExecutor)
and yields a future object that will invoke the function in another thread, and will
cache the result and return it on all subsequent calls to
deref/@. If the computation has not yet finished, calls to deref/@
will block, unless the variant of deref with timeout is used. See
also - realized?."
([f executor]
(let [f (binding-conveyor-fn f)
fut (.submit ^java.util.concurrent.ExecutorService executor
^Callable f)]
(reify
clojure.lang.IDeref
(deref [_] (deref-future fut))
clojure.lang.IBlockingDeref
(deref
[_ timeout-ms timeout-val]
(deref-future fut timeout-ms timeout-val))
clojure.lang.IPending
(isRealized [_] (.isDone fut))
java.util.concurrent.Future
(get [_] (.get fut))
(get [_ timeout unit] (.get fut timeout unit))
(isCancelled [_] (.isCancelled fut))
(isDone [_] (.isDone fut))
(cancel [_ interrupt?] (.cancel fut interrupt?)))))
([f]
(future-call-with-executor f clojure.lang.Agent/pooledExecutor)))

(defmacro future-with-pooled-executor
"Takes a executor and body of expressions and yields a future object that will
invoke the body in another thread, and will cache the result and
return it on all subsequent calls to deref/@. If the computation has
not yet finished, calls to deref/@ will block, unless the variant of
deref with timeout is used. See also - realized?."
([& body] `(future-call-with-executor (^{:once true} fn* [] ~@body) default-executor)))