diff --git a/src/taoensso/encore/flow.cljc b/src/taoensso/encore/flow.cljc new file mode 100644 index 00000000..a09aa632 --- /dev/null +++ b/src/taoensso/encore/flow.cljc @@ -0,0 +1,158 @@ +(ns taoensso.encore.flow + "Alpha, subject to change without notice! + Low-level toolkit for building context flows. + Used by Telemere, etc. + + Flows provide: + - Namespaced dynamic state + - Wrapping init + - Wrapping or mutable namespaced updates + - Flow paths with delta timings" + + {:added "vX.Y.Z (YYYY-MM-DD)"} + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer [have have?]]) + + #?(:cljs + (:require-macros + [taoensso.encore.flow :as flow-macros :refer []]))) + +(comment (remove-ns 'taoensso.encore.flow)) + +;; Note: the common case for this API is to be wrapped by a consumer-level API. +;; Expects `flows`: { } dynamic var. + +(do + (deftype Flow [flow-id ^long total-nsecs path state ^long nst]) + (defrecord FlowInfo [flow-id ^long total-nsecs path state]) ; Public + (defrecord PathEntry [flow-id ^long nsecs-delta ^long count]) ; Public + (deftype FlowResult [form-result flow-info])) + +(defn ^:no-doc ^Flow new-flow + "Private implementation detail." + [incl-path? flow-id init-state] + (Flow. flow-id 0 + (when incl-path? [(PathEntry. :flow/start 0 1)]) + init-state (enc/now-nano*))) + +(comment (new-flow true :fid1 {})) + +(def ^:private aborted-flow + (Flow. :flow/aborted -1 nil nil -1)) + +(defn ^:no-doc ^Flow update-flow + "Private implementation detail." + [^Flow flow step-id update-spec] + (if (identical? flow aborted-flow) + flow ; Noop + (let [nst (enc/now-nano*) + nsecs-delta (- nst (.-nst flow)) + new-path + (when-let [old-path (.-path flow)] + (let [^PathEntry last-pe (peek old-path)] + (if (enc/identical-kw? step-id (.-flow-id last-pe)) + (conj (pop old-path) (PathEntry. step-id (.-nsecs-delta last-pe) (inc (.-count last-pe)))) + (conj old-path (PathEntry. step-id nsecs-delta 1))))) + + new-state + (let [old-state (.-state flow)] + (enc/cond + (nil? update-spec) old-state + (fn? update-spec) (update-spec old-state) + (map? update-spec) + (if old-state + (conj old-state update-spec) + (do update-spec)) + + :else + (enc/unexpected-arg! update-spec + {:context `update-flow + :expected '#{nil fn map}})))] + + (Flow. (.-flow-id flow) + (+ (.-total-nsecs flow) nsecs-delta) + new-path new-state nst)))) + +(comment (update-flow (new-flow true :fid1 {}) :fid2 #(assoc % :foo :bar))) + +(defn ^:no-doc flow-info + "Private implementation detail." + ([^Flow flow end-flow?] + (when-not (identical? flow aborted-flow) + (let [final-flow (if end-flow? (update-flow flow :flow/end nil) flow)] + (FlowInfo. + (.-flow-id flow) + (.-total-nsecs final-flow) + (.-path final-flow) + (.-state final-flow))))) + + ([flows] ; { } + (not-empty + (reduce-kv + (fn [m flow-id flow_] + (let [flow @flow_] + (if (identical? aborted-flow flow) + (dissoc m flow-id) + (assoc m flow-id (flow-info flow false))))) + flows + flows)))) + +#?(:clj + (defmacro with-flow + "Executes given form within newly-initialized flow and returns `FlowResult`." + [flows incl-path? [flow-id init-state] form] + `(let [flow-id# ~flow-id + flow_# (atom (new-flow ~incl-path? flow-id# ~init-state)) + form-result# (binding [~flows (assoc ~flows flow-id# flow_#)] ~form)] + (FlowResult. form-result# (flow-info @flow_# true))))) + +(defn get-flow + "Returns `Flow` or nil." + [flows flow-id] + (when-let [flow_ (get flows flow-id)] + (let [flow @flow_] + (if (identical? flow aborted-flow) + nil + flow)))) + +(defn ^:no-doc flow-id-pair + "Private implementation detail." + [id-spec] (if (vector? id-spec) id-spec [:_default id-spec])) + +(defn ^:no-doc -update-flow! + "Private implementation detail." + [flows parent-id child-id update-spec] + (when-let [flow_ (get flows parent-id)] + (swap! flow_ (fn [old] (update-flow old child-id update-spec))))) + +#?(:clj + (defmacro update-flow! + "Returns updated `Flow`, or nil if given `parent-id` flow isn't already established." + [flows id-spec update-spec] + (let [id-spec (if (enc/const-form? id-spec) (flow-id-pair id-spec) id-spec)] + (if (vector? id-spec) + (let [[parent-id child-id] id-spec] + `(when-let [flows# ~flows] + (-update-flow! flows# ~parent-id ~child-id ~update-spec))) + + `(when-let [flows# ~flows] + (let [[parent-id# child-id#] (flow-id-pair ~id-spec)] + (update-flow! flows# parent-id# child-id# ~update-spec))))))) + +(defn abort-flow! + "If the named flow exists, aborts it and returns true. + Otherwise returns false." + [flows flow-id] + (if-let [flow_ (get flows flow-id)] + (enc/reset!? flow_ aborted-flow) + false)) + +(comment + (do + (def ^:dynamic *flows* "{ }" nil) + (with-flow *flows* true [:fid1 {}] (+ 3 2))) + + (.-flow-info + (with-flow *flows* true [:fid1 {}] + (abort-flow! *flows* :fid1)))) diff --git a/test/taoensso/encore_tests.cljc b/test/taoensso/encore_tests.cljc index c7977130..c00dc5ad 100644 --- a/test/taoensso/encore_tests.cljc +++ b/test/taoensso/encore_tests.cljc @@ -7,7 +7,9 @@ [clojure.string :as str] [taoensso.encore :as enc] [taoensso.encore.runner :as runner] - [taoensso.encore.ctx-filter :as cf]) + [taoensso.encore.ctx-filter :as cf] + [taoensso.encore.flow :as flow #?@(:cljs (:refer [Flow FlowResult]))]) + #?(:clj (:import [taoensso.encore.flow Flow FlowResult])) #?(:cljs (:require-macros @@ -773,6 +775,103 @@ (is (->> (cf/filter? nil [["*" :info]] "ns1" nil) (enc/throws? :common "Invalid level"))) (is (->> (cf/filter? nil [["*" :info]] nil nil) (enc/throws? :common "Invalid level")))])])]) +;;;; Flow + +(def ^:dynamic *flows* "{ }" nil) + +(defn- submaps? [maps submaps] (enc/reduce-zip (fn [acc m sub] (and acc (enc/submap? m sub))) true maps submaps)) +(defn- subflow? [flow subflow] + (and + (enc/submap? (dissoc flow :path) (dissoc subflow :path)) + (submaps? (get flow :path) (get subflow :path)))) + +(deftest _flow + [(let [flow-result (flow/with-flow *flows* true [:fid1 {:a :A}] (+ 3 2))] + [(is (= (.-form-result flow-result) 5)) + (is (subflow? (.-flow-info flow-result) + {:flow-id :fid1, :state {:a :A} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))]) + + (let [flow-result + (flow/with-flow *flows* true [:fid1 {:a :A}] + (do + (flow/update-flow! *flows* :fid2 {:b :B}) ; = [:_default :fid2] + (flow/update-flow! *flows* [:__nx :fid3] {:c :C}) + (flow/update-flow! *flows* [:fid1 :fid4] {:d :D}) + (flow/update-flow! *flows* [:fid1 :fid5] #(assoc % :e :E))))] + + [(is (subflow? (.-flow-info flow-result) + {:flow-id :fid1 + :state {:a :A :d :D :e :E} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid4, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid5, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}) + "Updates are conditional on parent id, map update-specs can be maps or fns.")]) + + (let [flow-result + (flow/with-flow *flows* true [:fid1 {:a :A}] + (dotimes [_ 100] (flow/update-flow! *flows* [:fid1 :fid2] nil)))] + + [(is (subflow? (.-flow-info flow-result) + {:path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid2, :nsecs-delta :submap/ex, :count 100} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}) + "Recursion increases path entry count")]) + + (testing "Aborted flows" + (let [flow-result + (flow/with-flow *flows* true [:fid1 {:a :A}] + [(flow/abort-flow! *flows* :fid1) + (flow/abort-flow! *flows* :fid1) + (flow/update-flow! *flows* [:fid1 :fid2] {:b :B})])] + + [(let [[fr1 fr2 fr3] (.-form-result flow-result)] + [(is (= [fr1 fr2] [true false])) + (is (= (flow/flow-info fr3 true) nil)) + (is (= (flow/flow-info {:fid1 (atom fr3)}) nil))])])) + + (testing "Nested flows" + (let [flow-result + (flow/with-flow *flows* true [:fid-a1 {:a1 :A1}] + (flow/with-flow *flows* true [:fid-b1 {:b1 :B1}] + (do + (flow/update-flow! *flows* [:fid-a1 :fid-a2] {:a2 :A2}) + (flow/update-flow! *flows* [:fid-b1 :fid-b2] {:b2 :B2}) + (flow/flow-info *flows*)))) + + {:keys [fid-a1 fid-b1] :as _inner-flow-info} + (.-form-result ^FlowResult + (.-form-result flow-result))] + + [(is (subflow? fid-a1 + {:flow-id :fid-a1, + :state {:a1 :A1, :a2 :A2}, + :total-nsecs :submap/ex, + :path ; Note no :flow/end + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1}]})) + + (is (subflow? fid-b1 + {:flow-id :fid-b1, + :state {:b1 :B1, :b2 :B2}, + :total-nsecs :submap/ex, + :path ; Note no :flow/end + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-b2, :nsecs-delta :submap/ex, :count 1}]})) + + (is (subflow? (.flow-info flow-result) ; Outer flow-result + {:flow-id :fid-a1 + :state {:a1 :A1, :a2 :A2} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))]))]) + ;;;; #?(:cljs (test/run-tests))