From 4e8ccb0be842a07bbec002249b2f241438bfbb00 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Sat, 2 Sep 2023 12:16:27 +0200 Subject: [PATCH] [new] Add `flow` namespace --- src/taoensso/encore/flow.cljc | 150 ++++++++++++++++++++++++++++++++ test/taoensso/encore_tests.cljc | 89 ++++++++++++++++++- 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 src/taoensso/encore/flow.cljc diff --git a/src/taoensso/encore/flow.cljc b/src/taoensso/encore/flow.cljc new file mode 100644 index 00000000..a6a3aa7b --- /dev/null +++ b/src/taoensso/encore/flow.cljc @@ -0,0 +1,150 @@ +(ns taoensso.encore.flow + "Alpha, subject to change without notice! + Low-level toolkit for building context flows. + Used by Telemere, etc. + + Flows provide: + - Namespaced dynamic state + - Wrapping init + - Wrapping or mutable namespaced updates + - Flow paths with delta timings" + + {:added "vX.Y.Z (YYYY-MM-DD)"} + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer [have have?]]) + + #?(:clj (:import [java.util.concurrent.atomic AtomicReference])) + + #?(:cljs + (:require-macros + [taoensso.encore.flow :as flow-macros :refer []]))) + +(comment (remove-ns 'taoensso.encore.flow)) + +;;;; TODO +;; - Review `abort-flow!` intention and implementation +;; - Add `abort-flow!` tests + +(do + (deftype Flow [ ^long total-nsecs path state ^long nst]) + (defrecord FlowInfo [flow-id ^long total-nsecs path state]) + (defrecord PathEntry [flow-id ^long nsecs-delta ^long count]) + (deftype FlowResult [form-result ^FlowInfo flow-info])) + +(defn ^:no-doc ^Flow new-flow + "Private implementation detail." + [incl-path? init-state] + (Flow. 0 + (when incl-path? [(PathEntry. :flow/start 0 1)]) + init-state (enc/now-nano*))) + +(comment (new-flow true {})) + +(def ^:no-doc aborted-flow + "Private implementation detail." + (Flow. -1 nil :flow/aborted 0)) + +(defn ^:no-doc ^Flow update-flow + "Private implementation detail." + [^Flow flow flow-id update-spec] + (if (identical? flow aborted-flow) + flow ; Noop + (let [nst (enc/now-nano*) + nsecs-delta (- nst (.-nst flow)) + new-path + (when-let [old-path (.-path flow)] + (let [^PathEntry last-pe (peek old-path)] + (if (enc/identical-kw? flow-id (.-flow-id last-pe)) + (conj (pop old-path) (PathEntry. flow-id (.-nsecs-delta last-pe) (inc (.-count last-pe)))) + (conj old-path (PathEntry. flow-id nsecs-delta 1))))) + + new-state + (let [old-state (.-state flow)] + (enc/cond + (nil? update-spec) old-state + (fn? update-spec) (update-spec old-state) + (map? update-spec) + (if old-state + (conj old-state update-spec) + (do update-spec)) + + :else + (enc/unexpected-arg! update-spec + {:context `update-flow + :expected '#{nil fn map}})))] + + (Flow. (+ (.-total-nsecs flow) nsecs-delta) + new-path new-state nst)))) + +(comment (update-flow (new-flow true {}) :fid1 #(assoc % :foo :bar))) + +(defn ^:no-doc flow-info + "Private implementation detail." + (^FlowInfo [flow-id ^Flow flow end-flow?] + (let [final-flow (if end-flow? (update-flow flow :flow/end nil) flow)] + (FlowInfo. flow-id + (.-total-nsecs final-flow) + (.-path final-flow) + (.-state final-flow)))) + + ([flows] ; { } + (reduce-kv + (fn [m flow-id flow_] + (let [flow @flow_] + (if (identical? aborted-flow flow) + (do m) + (assoc m flow-id (flow-info flow-id flow false))))) + flows + flows))) + +#?(:clj + (defmacro with-flow + "Executes given form within newly-initialized flow and returns `FlowResult`." + [flows-var incl-path? [flow-id init-state] form] + `(let [flow-id# ~flow-id + flow_# (atom (new-flow ~incl-path? ~init-state)) + form-result# (binding [~flows-var (assoc ~flows-var flow-id# flow_#)] ~form)] + (FlowResult. form-result# (flow-info flow-id# @flow_# true))))) + +#?(:clj + (defmacro get-flow_ + "Returns `Flow` atom, or nil." + [flows-var flow-id] + `(when-let [flows# ~flows-var] + (get flows# ~flow-id)))) + +(defn ^:no-doc flow-id-pair + "Private implementation detail." + [id-spec] (if (vector? id-spec) id-spec [:_default id-spec])) + +#?(:clj + (defmacro update-flow! + "Returns updated `Flow`, or nil if given `parent-id` flow isn't already established." + ([flows-var parent-id child-id update-spec] + `(when-let [flow_# (get-flow_ ~flows-var ~parent-id)] + (swap! flow_# (fn [old#] (update-flow old# ~child-id ~update-spec))))) + + ([flows-var id-spec update-spec] + (let [id-spec (if (enc/const-form? id-spec) (flow-id-pair id-spec) id-spec)] + (if (vector? id-spec) + (let [[parent-id child-id] id-spec] `(update-flow! ~flows-var ~parent-id ~child-id ~update-spec)) + `(let [[parent-id# child-id#] (flow-id-pair ~id-spec)] (update-flow! ~flows-var parent-id# child-id# ~update-spec))))))) + +(defmacro abort-flow! + "If the named flow exists, aborts it and returns true. + Otherwise returns false." + [flows-var flow-id] + `(let [flow-id# ~flow-id] + (if-let [flow_# (get-flow_ ~flows-var flow-id#)] + (enc/reset!? flow_# aborted-flow) + false))) + +(comment + (do + (def ^:dynamic *flows* "{ }" nil) + (with-flow *flows* true [:fid1 {}] (+ 3 2))) + + (.-flow-info + (with-flow *flows* true [:fid1 {}] + (abort-flow! *flows* :fid1)))) diff --git a/test/taoensso/encore_tests.cljc b/test/taoensso/encore_tests.cljc index c7977130..7d633d71 100644 --- a/test/taoensso/encore_tests.cljc +++ b/test/taoensso/encore_tests.cljc @@ -7,7 +7,9 @@ [clojure.string :as str] [taoensso.encore :as enc] [taoensso.encore.runner :as runner] - [taoensso.encore.ctx-filter :as cf]) + [taoensso.encore.ctx-filter :as cf] + [taoensso.encore.flow :as flow #?@(:cljs (:refer [Flow FlowResult]))]) + #?(:clj (:import [taoensso.encore.flow Flow FlowResult])) #?(:cljs (:require-macros @@ -773,6 +775,91 @@ (is (->> (cf/filter? nil [["*" :info]] "ns1" nil) (enc/throws? :common "Invalid level"))) (is (->> (cf/filter? nil [["*" :info]] nil nil) (enc/throws? :common "Invalid level")))])])]) +;;;; Flow + +(def ^:dynamic *flows* "{ }" nil) + +(defn- submaps? [maps submaps] (enc/reduce-zip (fn [acc m sub] (and acc (enc/submap? m sub))) true maps submaps)) +(defn- subflow? [flow subflow] + (and + (enc/submap? (dissoc flow :path) (dissoc subflow :path)) + (submaps? (get flow :path) (get subflow :path)))) + +(deftest _flow + [(let [flow-result (flow/with-flow *flows* true [:fid1 {:a :A}] (+ 3 2))] + [(is (= (.-form-result flow-result) 5)) + (is (subflow? (.-flow-info flow-result) + {:flow-id :fid1, :state {:a :A} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))]) + + (let [flow-result + (flow/with-flow *flows* true [:fid1 {:a :A}] + (do + (flow/update-flow! *flows* :fid2 {:b :B}) ; = [:_default :fid2] + (flow/update-flow! *flows* [:__nx :fid3] {:c :C}) + (flow/update-flow! *flows* [:fid1 :fid4] {:d :D}) + (flow/update-flow! *flows* [:fid1 :fid5] #(assoc % :e :E))))] + + [(is (subflow? (.-flow-info flow-result) + {:flow-id :fid1 + :state {:a :A :d :D :e :E} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid4, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid5, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}) + "Updates are conditional on parent id, map update-specs can be maps or fns.")]) + + (let [flow-result + (flow/with-flow *flows* true [:fid1 {:a :A}] + (dotimes [_ 100] (flow/update-flow! *flows* [:fid1 :fid2] nil)))] + + [(is (subflow? (.-flow-info flow-result) + {:path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid2, :nsecs-delta :submap/ex, :count 100} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}) + "Recursion increases path entry count")]) + + (testing "Nested flows" + (let [flow-result + (flow/with-flow *flows* true [:fid-a1 {:a1 :A1}] + (flow/with-flow *flows* true [:fid-b1 {:b1 :B1}] + (do + (flow/update-flow! *flows* [:fid-a1 :fid-a2] {:a2 :A2}) + (flow/update-flow! *flows* [:fid-b1 :fid-b2] {:b2 :B2}) + (flow/flow-info *flows*)))) + + {:keys [fid-a1 fid-b1] :as _inner-flow-info} + (.-form-result ^FlowResult + (.-form-result flow-result))] + + [(is (subflow? fid-a1 + {:flow-id :fid-a1, + :state {:a1 :A1, :a2 :A2}, + :total-nsecs :submap/ex, + :path ; Note no :flow/end + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1}]})) + + (is (subflow? fid-b1 + {:flow-id :fid-b1, + :state {:b1 :B1, :b2 :B2}, + :total-nsecs :submap/ex, + :path ; Note no :flow/end + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-b2, :nsecs-delta :submap/ex, :count 1}]})) + + (is (subflow? (.flow-info flow-result) ; Outer flow-result + {:flow-id :fid-a1 + :state {:a1 :A1, :a2 :A2} + :path + [{:flow-id :flow/start, :nsecs-delta :submap/ex, :count 1} + {:flow-id :fid-a2, :nsecs-delta :submap/ex, :count 1} + {:flow-id :flow/end, :nsecs-delta :submap/ex, :count 1}]}))]))]) + ;;;; #?(:cljs (test/run-tests))