-
-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
88551c5
commit 05ec4f3
Showing
2 changed files
with
258 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
(ns taoensso.encore.flow | ||
"Alpha, subject to change without notice! | ||
Low-level toolkit for building dynamic flows. | ||
Used by Telemere, etc. | ||
Flows provide: | ||
- Namespaced dynamic state | ||
- Wrapping init | ||
- Wrapping or mutable namespaced updates | ||
- Flow paths with delta timings" | ||
|
||
{:added "vX.Y.Z (YYYY-MM-DD)"} | ||
(:require | ||
[clojure.string :as str] | ||
[taoensso.encore :as enc :refer [have have?]]) | ||
|
||
#?(:cljs | ||
(:require-macros | ||
[taoensso.encore.flow :as flow-macros :refer []]))) | ||
|
||
(comment (remove-ns 'taoensso.encore.flow)) | ||
|
||
;; Note: the common case for this API is to be wrapped by a consumer-level API. | ||
;; Expects `flows`: {<flow-id> <Flow_>} dynamic var. | ||
|
||
(do | ||
(deftype Flow [flow-id ^long total-nsecs path state ^long nst]) | ||
(defrecord FlowInfo [flow-id ^long total-nsecs path state]) ; Public | ||
(defrecord PathEntry [flow-id ^long nsecs-delta ^long count]) ; Public | ||
(deftype FlowResult [form-result flow-info])) | ||
|
||
(defn ^:no-doc ^Flow new-flow | ||
"Private implementation detail." | ||
[incl-path? flow-id init-state] | ||
(Flow. flow-id 0 | ||
(when incl-path? [(PathEntry. :flow/start 0 1)]) | ||
init-state (enc/now-nano*))) | ||
|
||
(comment (new-flow true :fid1 {})) | ||
|
||
(def ^:private aborted-flow | ||
(Flow. :flow/aborted -1 nil nil -1)) | ||
|
||
(defn ^:no-doc ^Flow update-flow | ||
"Private implementation detail." | ||
[^Flow flow step-id update-spec] | ||
(if (identical? flow aborted-flow) | ||
flow ; Noop | ||
(let [nst (enc/now-nano*) | ||
nsecs-delta (- nst (.-nst flow)) | ||
new-path | ||
(when-let [old-path (.-path flow)] | ||
(let [^PathEntry last-pe (peek old-path)] | ||
(if (enc/identical-kw? step-id (.-flow-id last-pe)) | ||
(conj (pop old-path) (PathEntry. step-id (.-nsecs-delta last-pe) (inc (.-count last-pe)))) | ||
(conj old-path (PathEntry. step-id nsecs-delta 1))))) | ||
|
||
new-state | ||
(let [old-state (.-state flow)] | ||
(enc/cond | ||
(nil? update-spec) old-state | ||
(fn? update-spec) (update-spec old-state) | ||
(map? update-spec) | ||
(if old-state | ||
(conj old-state update-spec) | ||
(do update-spec)) | ||
|
||
:else | ||
(enc/unexpected-arg! update-spec | ||
{:context `update-flow | ||
:expected '#{nil fn map}})))] | ||
|
||
(Flow. (.-flow-id flow) | ||
(+ (.-total-nsecs flow) nsecs-delta) | ||
new-path new-state nst)))) | ||
|
||
(comment (update-flow (new-flow true :fid1 {}) :fid2 #(assoc % :foo :bar))) | ||
|
||
(defn ^:no-doc flow-info | ||
"Private implementation detail." | ||
([^Flow flow end-flow?] | ||
(when-not (identical? flow aborted-flow) | ||
(let [final-flow (if end-flow? (update-flow flow :flow/end nil) flow)] | ||
(FlowInfo. | ||
(.-flow-id flow) | ||
(.-total-nsecs final-flow) | ||
(.-path final-flow) | ||
(.-state final-flow))))) | ||
|
||
([flows] ; {<flow-id> <FlowInfo>} | ||
(not-empty | ||
(reduce-kv | ||
(fn [m flow-id flow_] | ||
(let [flow (flow_)] | ||
(if (identical? aborted-flow flow) | ||
(dissoc m flow-id) | ||
(assoc m flow-id (flow-info flow false))))) | ||
flows | ||
flows)))) | ||
|
||
#?(:clj | ||
(defmacro with-flow | ||
"Executes given form within newly-initialized flow and returns `FlowResult`." | ||
[flows incl-path? [flow-id init-state] form] | ||
`(let [flow-id# ~flow-id | ||
flow_# (enc/latom (new-flow ~incl-path? flow-id# ~init-state)) | ||
form-result# (binding [~flows (assoc ~flows flow-id# flow_#)] ~form)] | ||
(FlowResult. form-result# (flow-info (flow_#) true))))) | ||
|
||
(defn get-flow | ||
"Returns `Flow` or nil." | ||
[flows flow-id] | ||
(when-let [flow_ (get flows flow-id)] | ||
(let [flow (flow_)] | ||
(if (identical? flow aborted-flow) | ||
nil | ||
flow)))) | ||
|
||
(defn ^:no-doc flow-id-pair | ||
"Private implementation detail." | ||
[id-spec] (if (vector? id-spec) id-spec [:_default id-spec])) | ||
|
||
(defn ^:no-doc -update-flow! | ||
"Private implementation detail." | ||
[flows parent-id child-id update-spec] | ||
(when-let [flow_ (get flows parent-id)] | ||
(flow_ (fn swap-fn [old] (update-flow old child-id update-spec))))) | ||
|
||
#?(:clj | ||
(defmacro update-flow! | ||
"Returns updated `Flow`, or nil if given `parent-id` flow isn't already established." | ||
[flows id-spec update-spec] | ||
(let [id-spec (if (enc/const-form? id-spec) (flow-id-pair id-spec) id-spec)] | ||
(if (vector? id-spec) | ||
(let [[parent-id child-id] id-spec] | ||
`(when-let [flows# ~flows] | ||
(-update-flow! flows# ~parent-id ~child-id ~update-spec))) | ||
|
||
`(when-let [flows# ~flows] | ||
(let [[parent-id# child-id#] (flow-id-pair ~id-spec)] | ||
(update-flow! flows# parent-id# child-id# ~update-spec))))))) | ||
|
||
(defn abort-flow! | ||
"If the named flow exists, aborts it and returns true. | ||
Otherwise returns false." | ||
[flows flow-id] | ||
(if-let [flow_ (get flows flow-id)] | ||
(enc/reset!? flow_ aborted-flow) | ||
false)) | ||
|
||
(comment | ||
(do | ||
(def ^:dynamic *flows* "{<flow-id> <Flow_>}" nil) | ||
(with-flow *flows* true [:fid1 {}] (+ 3 2))) | ||
|
||
(.-flow-info | ||
(with-flow *flows* true [:fid1 {}] | ||
(abort-flow! *flows* :fid1)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters