diff --git a/deps.edn b/deps.edn index 4dd0c78..88f4672 100644 --- a/deps.edn +++ b/deps.edn @@ -1,7 +1,9 @@ -{:paths ["src"] - :deps {funcool/urania {:mvn/version "0.2.0"} - funcool/promesa {:mvn/version "5.1.0"} - org.clojure/tools.logging {:mvn/version "0.6.0"}} +{:paths ["src"] + :deps {funcool/urania {:git/url "https://github.com/green-labs/urania.git" + :sha "097afe6b7889baf8eff2ddc34937b61f36adc48c"} + ; urania 0.2.0의 다음 버전이 릴리즈되면 그 버전으로 업데이트 필요 + funcool/promesa {:mvn/version "5.1.0"} + org.clojure/tools.logging {:mvn/version "0.6.0"}} :mvn/repos {"central" {:url "https://repo1.maven.org/maven2/"} diff --git a/src/superlifter/api.cljc b/src/superlifter/api.cljc index 90c0335..1b75b4d 100644 --- a/src/superlifter/api.cljc +++ b/src/superlifter/api.cljc @@ -5,10 +5,13 @@ (defn unwrap ([p] (unwrap identity p)) - ([f p] + ([f p] (unwrap f identity p)) + ([then-f catch-f p] (if (prom/promise? p) - (prom/then p f) - (prom/resolved (f p))))) + (-> p + (prom/then then-f) + (prom/catch catch-f)) + (prom/resolved (then-f p))))) #?(:clj (defmacro def-fetcher [sym bindings do-fetch-fn] `(defrecord ~sym ~bindings diff --git a/src/superlifter/core.cljc b/src/superlifter/core.cljc index ba65a9f..9a9daa4 100644 --- a/src/superlifter/core.cljc +++ b/src/superlifter/core.cljc @@ -38,9 +38,12 @@ (fn [buckets] (update buckets bucket-id (comp f clear-ready)))) (get bucket-id))] - (if-let [muses (not-empty (get-in new [:queue :ready]))] - (let [cache (get-in new [:urania-opts :cache])] - (log :info "Fetching" (count muses) "muses from bucket" bucket-id) + (if-let [muses-and-promises (not-empty (get-in new [:queue :ready]))] + (let [cache (get-in new [:urania-opts :cache]) + + muses (map :muse muses-and-promises) + promises (map :promise muses-and-promises)] + (log :info "Fetching" (count muses-and-promises) "muses from bucket" bucket-id) (-> (u/execute! (u/collect muses) (merge (:urania-opts new) (when cache @@ -49,7 +52,14 @@ (fn [[result new-cache-value]] (when cache (urania-> cache new-cache-value)) - result)))) + (doall (map prom/resolve! promises result)))) + ;(run! (fn [[p result]] (prom/resolve! p result)) + ; (zipmap promises result)))) + (prom/catch + (fn [ex] + (doall (map prom/reject! promises (repeat ex))))))) + ;(run! (fn [p] (prom/reject! p ex)) + ; promises))))) (do (log :debug "Nothing ready to fetch for" bucket-id) (prom/resolved nil))))) @@ -72,37 +82,34 @@ The muses in the queue will all be fetched together when a trigger condition is met." ([context muse] (enqueue! context default-bucket-id muse)) ([context bucket-id muse] - (let [p (prom/deferred) - delivering-muse (u/map (fn [result] - (prom/resolve! p result) - result) - muse)] + (let [promise (prom/deferred)] (log :debug "Enqueuing muse into" bucket-id (:id muse)) (update-bucket! context bucket-id (fn [bucket] (reduce (fn [b trigger-fn] (trigger-fn b)) - (update-in bucket [:queue :waiting] conj delivering-muse) + (update-in bucket [:queue :waiting] conj {:muse muse + :promise promise}) (keep :enqueue-fn (vals (:triggers bucket)))))) - p))) + promise))) (defn- fetch-all-handling-errors! [context bucket-id] (try (prom/catch (fetch-bucket! context bucket-id) - (fn [error] - (log :warn "Fetch failed" error))) + (fn [error] + (log :warn "Fetch failed" error))) (catch Throwable t (log :warn "Fetch failed" t)))) (defmulti start-trigger! (fn [kind _context _bucket-id _opts] kind)) (defmethod start-trigger! :queue-size [_ _context _bucket-id {:keys [threshold] :as opts}] - (assoc opts :enqueue-fn (fn [{:keys [queue] :as bucket}] - (if (= threshold (count (:waiting queue))) - (-> bucket - (assoc-in [:queue :ready] (take threshold (:waiting queue))) - (update-in [:queue :waiting] #(drop threshold %))) - bucket)))) + (assoc opts :enqueue-fn (fn [{:keys [queue] :as bucket}] + (if (= threshold (count (:waiting queue))) + (-> bucket + (assoc-in [:queue :ready] (take threshold (:waiting queue))) + (update-in [:queue :waiting] #(drop threshold %))) + bucket)))) (defmethod start-trigger! :elastic [kind _context _bucket-id opts] (assoc opts diff --git a/src/superlifter/lacinia.clj b/src/superlifter/lacinia.clj index 9b0f6eb..527d963 100644 --- a/src/superlifter/lacinia.clj +++ b/src/superlifter/lacinia.clj @@ -15,7 +15,9 @@ (defn ->lacinia-promise [sl-result] (let [l-prom (resolve/resolve-promise)] - (api/unwrap #(resolve/deliver! l-prom %) (prom/catch sl-result prom/resolved)) + (api/unwrap (fn [result] (resolve/deliver! l-prom result)) + (fn [error] (resolve/deliver! l-prom nil {:message (.getMessage error)})) + sl-result) l-prom)) (defmacro with-superlifter [ctx body]