Skip to content

Commit

Permalink
[new] Add flow namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Sep 5, 2023
1 parent a369a08 commit 070c92f
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 1 deletion.
158 changes: 158 additions & 0 deletions src/taoensso/encore/flow.cljc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
(ns taoensso.encore.flow
"Alpha, subject to change without notice!
Low-level toolkit for building context flows.
Used by Telemere, etc.
Flows provide:
- Namespaced dynamic state
- Wrapping init
- Wrapping or mutable namespaced updates
- Flow paths with delta timings"

{:added "vX.Y.Z (YYYY-MM-DD)"}
(:require
[clojure.string :as str]
[taoensso.encore :as enc :refer [have have?]])

#?(:cljs
(:require-macros
[taoensso.encore.flow :as flow-macros :refer []])))

(comment (remove-ns 'taoensso.encore.flow))

;; Note: the common case for this API is to be wrapped by a consumer-level API.
;; Expects `flows`: {<flow-id> <Flow_>} dynamic var.

(do
(deftype Flow [flow-id ^long total-nsecs path state ^long nst])
(defrecord FlowInfo [flow-id ^long total-nsecs path state]) ; Public
(defrecord PathEntry [flow-id ^long nsecs-delta ^long count]) ; Public
(deftype FlowResult [form-result flow-info]))

(defn ^:no-doc ^Flow new-flow
"Private implementation detail."
[incl-path? flow-id init-state]
(Flow. flow-id 0
(when incl-path? [(PathEntry. :flow/start 0 1)])
init-state (enc/now-nano*)))

(comment (new-flow true :fid1 {}))

(def ^:private aborted-flow
(Flow. :flow/aborted -1 nil nil -1))

(defn ^:no-doc ^Flow update-flow
"Private implementation detail."
[^Flow flow step-id update-spec]
(if (identical? flow aborted-flow)
flow ; Noop
(let [nst (enc/now-nano*)
nsecs-delta (- nst (.-nst flow))
new-path
(when-let [old-path (.-path flow)]
(let [^PathEntry last-pe (peek old-path)]
(if (enc/identical-kw? step-id (.-flow-id last-pe))
(conj (pop old-path) (PathEntry. step-id (.-nsecs-delta last-pe) (inc (.-count last-pe))))
(conj old-path (PathEntry. step-id nsecs-delta 1)))))

new-state
(let [old-state (.-state flow)]
(enc/cond
(nil? update-spec) old-state
(fn? update-spec) (update-spec old-state)
(map? update-spec)
(if old-state
(conj old-state update-spec)
(do update-spec))

:else
(enc/unexpected-arg! update-spec
{:context `update-flow
:expected '#{nil fn map}})))]

(Flow. (.-flow-id flow)
(+ (.-total-nsecs flow) nsecs-delta)
new-path new-state nst))))

(comment (update-flow (new-flow true :fid1 {}) :fid2 #(assoc % :foo :bar)))

(defn ^:no-doc flow-info
"Private implementation detail."
([^Flow flow end-flow?]
(when-not (identical? flow aborted-flow)
(let [final-flow (if end-flow? (update-flow flow :flow/end nil) flow)]
(FlowInfo.
(.-flow-id flow)
(.-total-nsecs final-flow)
(.-path final-flow)
(.-state final-flow)))))

([flows] ; {<flow-id> <FlowInfo>}
(not-empty
(reduce-kv
(fn [m flow-id flow_]
(let [flow @flow_]
(if (identical? aborted-flow flow)
(dissoc m flow-id)
(assoc m flow-id (flow-info flow false)))))
flows
flows))))

#?(:clj
(defmacro with-flow
"Executes given form within newly-initialized flow and returns `FlowResult`."
[flows incl-path? [flow-id init-state] form]
`(let [flow-id# ~flow-id
flow_# (atom (new-flow ~incl-path? flow-id# ~init-state))
form-result# (binding [~flows (assoc ~flows flow-id# flow_#)] ~form)]
(FlowResult. form-result# (flow-info @flow_# true)))))

(defn get-flow
"Returns `Flow` or nil."
[flows flow-id]
(when-let [flow_ (get flows flow-id)]
(let [flow @flow_]
(if (identical? flow aborted-flow)
nil
flow))))

(defn ^:no-doc flow-id-pair
"Private implementation detail."
[id-spec] (if (vector? id-spec) id-spec [:_default id-spec]))

(defn ^:no-doc -update-flow!
"Private implementation detail."
[flows parent-id child-id update-spec]
(when-let [flow_ (get flows parent-id)]
(swap! flow_ (fn [old] (update-flow old child-id update-spec)))))

#?(:clj
(defmacro update-flow!
"Returns updated `Flow`, or nil if given `parent-id` flow isn't already established."
[flows id-spec update-spec]
(let [id-spec (if (enc/const-form? id-spec) (flow-id-pair id-spec) id-spec)]
(if (vector? id-spec)
(let [[parent-id child-id] id-spec]
`(when-let [flows# ~flows]
(-update-flow! flows# ~parent-id ~child-id ~update-spec)))

`(when-let [flows# ~flows]
(let [[parent-id# child-id#] (flow-id-pair ~id-spec)]
(update-flow! flows# parent-id# child-id# ~update-spec)))))))

(defn abort-flow!
"If the named flow exists, aborts it and returns true.
Otherwise returns false."
[flows flow-id]
(if-let [flow_ (get flows flow-id)]
(enc/reset!? flow_ aborted-flow)
false))

(comment
(do
(def ^:dynamic *flows* "{<flow-id> <Flow_>}" nil)
(with-flow *flows* true [:fid1 {}] (+ 3 2)))

(.-flow-info
(with-flow *flows* true [:fid1 {}]
(abort-flow! *flows* :fid1))))
101 changes: 100 additions & 1 deletion test/taoensso/encore_tests.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
[clojure.string :as str]
[taoensso.encore :as enc]
[taoensso.encore.runner :as runner]
[taoensso.encore.ctx-filter :as cf])
[taoensso.encore.ctx-filter :as cf]
[taoensso.encore.flow :as flow #?@(:cljs (:refer [Flow FlowResult]))])
#?(:clj (:import [taoensso.encore.flow Flow FlowResult]))

#?(:cljs
(:require-macros
Expand Down Expand Up @@ -773,6 +775,103 @@
(is (->> (cf/filter? nil [["*" :info]] "ns1" nil) (enc/throws? :common "Invalid level")))
(is (->> (cf/filter? nil [["*" :info]] nil nil) (enc/throws? :common "Invalid level")))])])])

;;;; Flow

(def ^:dynamic *flows* "{<flow-id> <Flow_>}" nil)

(defn- submaps? [maps submaps] (enc/reduce-zip (fn [acc m sub] (and acc (enc/submap? m sub))) true maps submaps))
(defn- subflow? [flow subflow]
(and
(enc/submap? (dissoc flow :path) (dissoc subflow :path))
(submaps? (get flow :path) (get subflow :path))))

(deftest _flow
[(let [flow-result (flow/with-flow *flows* true [:fid1 {:a :A}] (+ 3 2))]
[(is (= (.-form-result flow-result) 5))
(is (subflow? (.-flow-info flow-result)
{:flow-id :fid1, :state {:a :A}
:path
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))])

(let [flow-result
(flow/with-flow *flows* true [:fid1 {:a :A}]
(do
(flow/update-flow! *flows* :fid2 {:b :B}) ; = [:_default :fid2]
(flow/update-flow! *flows* [:__nx :fid3] {:c :C})
(flow/update-flow! *flows* [:fid1 :fid4] {:d :D})
(flow/update-flow! *flows* [:fid1 :fid5] #(assoc % :e :E))))]

[(is (subflow? (.-flow-info flow-result)
{:flow-id :fid1
:state {:a :A :d :D :e :E}
:path
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid4, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid5, :nsecs-delta :submap/ex, :count 1}
{:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]})
"Updates are conditional on parent id, map update-specs can be maps or fns.")])

(let [flow-result
(flow/with-flow *flows* true [:fid1 {:a :A}]
(dotimes [_ 100] (flow/update-flow! *flows* [:fid1 :fid2] nil)))]

[(is (subflow? (.-flow-info flow-result)
{:path
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid2, :nsecs-delta :submap/ex, :count 100}
{:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]})
"Recursion increases path entry count")])

(testing "Aborted flows"
(let [flow-result
(flow/with-flow *flows* true [:fid1 {:a :A}]
[(flow/abort-flow! *flows* :fid1)
(flow/abort-flow! *flows* :fid1)
(flow/update-flow! *flows* [:fid1 :fid2] {:b :B})])]

[(let [[fr1 fr2 fr3] (.-form-result flow-result)]
[(is (= [fr1 fr2] [true false]))
(is (= (flow/flow-info fr3 true) nil))
(is (= (flow/flow-info {:fid1 (atom fr3)}) nil))])]))

(testing "Nested flows"
(let [flow-result
(flow/with-flow *flows* true [:fid-a1 {:a1 :A1}]
(flow/with-flow *flows* true [:fid-b1 {:b1 :B1}]
(do
(flow/update-flow! *flows* [:fid-a1 :fid-a2] {:a2 :A2})
(flow/update-flow! *flows* [:fid-b1 :fid-b2] {:b2 :B2})
(flow/flow-info *flows*))))

{:keys [fid-a1 fid-b1] :as _inner-flow-info}
(.-form-result ^FlowResult
(.-form-result flow-result))]

[(is (subflow? fid-a1
{:flow-id :fid-a1,
:state {:a1 :A1, :a2 :A2},
:total-nsecs :submap/ex,
:path ; Note no :flow/end
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1}]}))

(is (subflow? fid-b1
{:flow-id :fid-b1,
:state {:b1 :B1, :b2 :B2},
:total-nsecs :submap/ex,
:path ; Note no :flow/end
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid-b2, :nsecs-delta :submap/ex, :count 1}]}))

(is (subflow? (.flow-info flow-result) ; Outer flow-result
{:flow-id :fid-a1
:state {:a1 :A1, :a2 :A2}
:path
[{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1}
{:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1}
{:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))]))])

;;;;

#?(:cljs (test/run-tests))

0 comments on commit 070c92f

Please sign in to comment.