-
-
Notifications
You must be signed in to change notification settings - Fork 53
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a369a08
commit 4e8ccb0
Showing
2 changed files
with
238 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
(ns taoensso.encore.flow | ||
"Alpha, subject to change without notice! | ||
Low-level toolkit for building context flows. | ||
Used by Telemere, etc. | ||
Flows provide: | ||
- Namespaced dynamic state | ||
- Wrapping init | ||
- Wrapping or mutable namespaced updates | ||
- Flow paths with delta timings" | ||
|
||
{:added "vX.Y.Z (YYYY-MM-DD)"} | ||
(:require | ||
[clojure.string :as str] | ||
[taoensso.encore :as enc :refer [have have?]]) | ||
|
||
#?(:clj (:import [java.util.concurrent.atomic AtomicReference])) | ||
|
||
#?(:cljs | ||
(:require-macros | ||
[taoensso.encore.flow :as flow-macros :refer []]))) | ||
|
||
(comment (remove-ns 'taoensso.encore.flow)) | ||
|
||
;;;; TODO | ||
;; - Review `abort-flow!` intention and implementation | ||
;; - Add `abort-flow!` tests | ||
|
||
(do | ||
(deftype Flow [ ^long total-nsecs path state ^long nst]) | ||
(defrecord FlowInfo [flow-id ^long total-nsecs path state]) | ||
(defrecord PathEntry [flow-id ^long nsecs-delta ^long count]) | ||
(deftype FlowResult [form-result ^FlowInfo flow-info])) | ||
|
||
(defn ^:no-doc ^Flow new-flow | ||
"Private implementation detail." | ||
[incl-path? init-state] | ||
(Flow. 0 | ||
(when incl-path? [(PathEntry. :flow/start 0 1)]) | ||
init-state (enc/now-nano*))) | ||
|
||
(comment (new-flow true {})) | ||
|
||
(def ^:no-doc aborted-flow | ||
"Private implementation detail." | ||
(Flow. -1 nil :flow/aborted 0)) | ||
|
||
(defn ^:no-doc ^Flow update-flow | ||
"Private implementation detail." | ||
[^Flow flow flow-id update-spec] | ||
(if (identical? flow aborted-flow) | ||
flow ; Noop | ||
(let [nst (enc/now-nano*) | ||
nsecs-delta (- nst (.-nst flow)) | ||
new-path | ||
(when-let [old-path (.-path flow)] | ||
(let [^PathEntry last-pe (peek old-path)] | ||
(if (enc/identical-kw? flow-id (.-flow-id last-pe)) | ||
(conj (pop old-path) (PathEntry. flow-id (.-nsecs-delta last-pe) (inc (.-count last-pe)))) | ||
(conj old-path (PathEntry. flow-id nsecs-delta 1))))) | ||
|
||
new-state | ||
(let [old-state (.-state flow)] | ||
(enc/cond | ||
(nil? update-spec) old-state | ||
(fn? update-spec) (update-spec old-state) | ||
(map? update-spec) | ||
(if old-state | ||
(conj old-state update-spec) | ||
(do update-spec)) | ||
|
||
:else | ||
(enc/unexpected-arg! update-spec | ||
{:context `update-flow | ||
:expected '#{nil fn map}})))] | ||
|
||
(Flow. (+ (.-total-nsecs flow) nsecs-delta) | ||
new-path new-state nst)))) | ||
|
||
(comment (update-flow (new-flow true {}) :fid1 #(assoc % :foo :bar))) | ||
|
||
(defn ^:no-doc flow-info | ||
"Private implementation detail." | ||
(^FlowInfo [flow-id ^Flow flow end-flow?] | ||
(let [final-flow (if end-flow? (update-flow flow :flow/end nil) flow)] | ||
(FlowInfo. flow-id | ||
(.-total-nsecs final-flow) | ||
(.-path final-flow) | ||
(.-state final-flow)))) | ||
|
||
([flows] ; {<flow-id> <FlowInfo>} | ||
(reduce-kv | ||
(fn [m flow-id flow_] | ||
(let [flow @flow_] | ||
(if (identical? aborted-flow flow) | ||
(do m) | ||
(assoc m flow-id (flow-info flow-id flow false))))) | ||
flows | ||
flows))) | ||
|
||
#?(:clj | ||
(defmacro with-flow | ||
"Executes given form within newly-initialized flow and returns `FlowResult`." | ||
[flows-var incl-path? [flow-id init-state] form] | ||
`(let [flow-id# ~flow-id | ||
flow_# (atom (new-flow ~incl-path? ~init-state)) | ||
form-result# (binding [~flows-var (assoc ~flows-var flow-id# flow_#)] ~form)] | ||
(FlowResult. form-result# (flow-info flow-id# @flow_# true))))) | ||
|
||
#?(:clj | ||
(defmacro get-flow_ | ||
"Returns `Flow` atom, or nil." | ||
[flows-var flow-id] | ||
`(when-let [flows# ~flows-var] | ||
(get flows# ~flow-id)))) | ||
|
||
(defn ^:no-doc flow-id-pair | ||
"Private implementation detail." | ||
[id-spec] (if (vector? id-spec) id-spec [:_default id-spec])) | ||
|
||
#?(:clj | ||
(defmacro update-flow! | ||
"Returns updated `Flow`, or nil if given `parent-id` flow isn't already established." | ||
([flows-var parent-id child-id update-spec] | ||
`(when-let [flow_# (get-flow_ ~flows-var ~parent-id)] | ||
(swap! flow_# (fn [old#] (update-flow old# ~child-id ~update-spec))))) | ||
|
||
([flows-var id-spec update-spec] | ||
(let [id-spec (if (enc/const-form? id-spec) (flow-id-pair id-spec) id-spec)] | ||
(if (vector? id-spec) | ||
(let [[parent-id child-id] id-spec] `(update-flow! ~flows-var ~parent-id ~child-id ~update-spec)) | ||
`(let [[parent-id# child-id#] (flow-id-pair ~id-spec)] (update-flow! ~flows-var parent-id# child-id# ~update-spec))))))) | ||
|
||
(defmacro abort-flow! | ||
"If the named flow exists, aborts it and returns true. | ||
Otherwise returns false." | ||
[flows-var flow-id] | ||
`(let [flow-id# ~flow-id] | ||
(if-let [flow_# (get-flow_ ~flows-var flow-id#)] | ||
(enc/reset!? flow_# aborted-flow) | ||
false))) | ||
|
||
(comment | ||
(do | ||
(def ^:dynamic *flows* "{<flow-id> <Flow_>}" nil) | ||
(with-flow *flows* true [:fid1 {}] (+ 3 2))) | ||
|
||
(.-flow-info | ||
(with-flow *flows* true [:fid1 {}] | ||
(abort-flow! *flows* :fid1)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters