diff --git a/src/taoensso/encore.cljc b/src/taoensso/encore.cljc index 3692ffe2..b9b7b615 100644 --- a/src/taoensso/encore.cljc +++ b/src/taoensso/encore.cljc @@ -104,6 +104,7 @@ [java.util Date Locale TimeZone] [java.text SimpleDateFormat] [java.util.concurrent CountDownLatch] + [java.util.concurrent.atomic AtomicReference] ;; [org.apache.commons.codec.binary Base64] ) @@ -112,7 +113,7 @@ [taoensso.encore :as enc-macros :refer [have have! have? compile-if if-let if-some if-not when when-not when-some when-let -cond cond - def* defonce cond! try* catching -if-cas! now-dt* now-udt* now-nano* min* max* + def* defonce cond! try* catching -cas!? now-dt* now-udt* now-nano* min* max* name-with-attrs deprecated new-object defalias throws throws? identical-kw?]]))) @@ -2527,6 +2528,82 @@ (select-nested-keys {:a 1 :b 1 :c 1} [:a :c]) (select-keys {:a 1 :b 1 :c 1} [:a :c]))) +;;;; LightAtom + +#?(:cljs + (deftype LightAtom [^:mutable state] + IDeref (-deref [_ ] state) + IReset (-reset! [_ new] (set! state new) new) + IFn + (-invoke [_ swap-fn] (let [new (swap-fn state)] (set! state new) new)) + (-invoke [_ k swap-fn] + (let [old-map state + new-val (swap-fn (get old-map k)) + new-map (assoc old-map k new-val)] + (set! state new-map) + (do new-val)))) + + :clj + (deftype LightAtom [^AtomicReference aref] + clojure.lang.IDeref (deref [_] (.get aref)) + clojure.lang.IAtom + (reset [_ new] (.set aref new) new) + (compareAndSet [_ old new] (.compareAndSet aref old new)) + + clojure.lang.IFn + (invoke [_ swap-fn] + (loop [] + (let [old (.get aref) + new (swap-fn old)] + (if (.compareAndSet aref old new) + new + (recur))))) + + (invoke [_ k swap-fn] + (loop [] + (let [old-map (.get aref) + new-val (swap-fn (get old-map k)) + new-map (assoc old-map k new-val)] + (if (.compareAndSet aref old-map new-map) + new-val + (recur))))))) + +(defn ^:no-doc ^LightAtom light-atom + "Private implementation detail. + Micro-optimized `atom` for internal use." + {:added "vX.Y.Z (YYYY-MM-DD)"} + [init-state] + (LightAtom. + #?(:clj (AtomicReference. init-state) + :cljs init-state))) + +#?(:clj + (let [atom-tag (compile-if clojure.lang.IAtom 'clojure.lang.IAtom 'clojure.lang.Atom)] + (defmacro ^:no-doc -cas!? + "Private implementation detail. + Micro-optimized `compare-and-set!` for internal use." + {:added "vX.Y.Z (YYYY-MM-DD)"} + [atom_ old-val new-val] + (if (:ns &env) + `(compare-and-set! ~atom_ ~old-val ~new-val) + `(.compareAndSet ~(with-meta atom_ {:tag atom-tag}) + ~old-val ~new-val))))) + +(comment (let [a (atom nil)] (qb 1e6 (compare-and-set! a 0 1) (-cas!? a 0 1)))) ; [50.06 35.64] + +(comment + (let [a (atom 0), v (volatile! 0), l (light-atom 0)] + {:deref (qb 2e6 @a @v @l) + :new (qb 2e6 (atom 0) (volatile! 0) (light-atom 0)) + :swap (qb 2e6 (swap! a inc) (vswap! v inc) (l inc)) + :cas (qb 2e6 (compare-and-set! a 0 1) (-cas!? l 0 1))}) + + {:deref [83.84 85.19 86.35], ; ~same + :new [107.61 82.19 81.7], ; 15% faster + :swap [130.55 109.94 98.39], ; 15% faster + :cas [101.78 70.39] ; 20% faster + }) + ;;;; Swap API ;; - reset-in! ; Keys: 0, 1, n (general) ;; - reset-val! ; Keys: 1 (optimized) @@ -2540,26 +2617,12 @@ ;; ;; - pull-val! ; Keys: 1 (optimized, common transform) -(compile-if clojure.lang.IAtom - (def ^:private ^:const atom-tag 'clojure.lang.IAtom) - (def ^:private ^:const atom-tag 'clojure.lang.Atom)) - -#?(:clj - (defmacro ^:no-doc -if-cas! - "Micro optimization, mostly for Cljs. - Implementation detail, but public for use by other Taoensso libs." - [atom_ old-val new-val then & [else]] - (if (:ns &env) - `(do (reset! ~atom_ ~new-val) ~then) - `(if (.compareAndSet ~(with-meta atom_ {:tag atom-tag}) ~old-val ~new-val) - ~then ~else)))) - (defn- -reset-k0! "Impln. for 0-key resets" [return atom_ m1] (loop [] (let [m0 @atom_] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (return m0 m0 m1 m1) ; [m0 v0 m1 v1] (recur))))) @@ -2569,7 +2632,7 @@ (loop [] (let [m0 @atom_ m1 (assoc m0 k v1)] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (return m0 (get m0 k not-found) m1 v1) ; [m0 v0/nx m1 v1] (recur))))) @@ -2581,7 +2644,7 @@ (loop [] (let [m0 @atom_ m1 (assoc-in m0 ks v1)] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (return m0 (get-in m0 ks not-found) m1 v1) ; [m0 v0/nx m1 v1] (recur)))) @@ -2626,7 +2689,7 @@ [atom_ val] (loop [] (let [old @atom_] - (-if-cas! atom_ old val + (if (-cas!? atom_ old val) (not= old val) (recur))))) @@ -2667,7 +2730,7 @@ (return-swapped s1 m0 m1) ; rv (return m0 m0 m0 m0)) ; [m0 v0 m1 v1] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (if sw? (return-swapped s1 m0 m1) ; rv (return m0 m0 m1 m1)) ; [m0 v0 m1 v1] @@ -2680,7 +2743,7 @@ (loop [] (let [m0 @atom_ m1 (dissoc m0 k)] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (return m0 (get m0 k not-found) m1 :swap/dissoc) ; [m0 v0/nx m1 v1] (recur)))) @@ -2701,7 +2764,7 @@ (dissoc m0 k) (assoc m0 k v1))] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (if sw? (return-swapped s1 m0 m1) ; rv (return m0 v0 m1 v1)) ; [m0 v0/nx m1 v1] @@ -2717,7 +2780,7 @@ (loop [] (let [m0 @atom_ m1 (dissoc-in m0 ks)] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (return m0 (get-in m0 ks not-found) m1 :swap/dissoc) ; [m0 v0/nx m1 v1] (recur)))) @@ -2738,7 +2801,7 @@ (dissoc-in m0 ks) (assoc-in m0 ks v1))] - (-if-cas! atom_ m0 m1 + (if (-cas!? atom_ m0 m1) (if sw? (return-swapped s1 m0 m1) ; rv (return m0 v0 m1 v1)) ; [m0 v0/nx m1 v1] @@ -2823,11 +2886,11 @@ "Like `core/memoize` but only caches the fn's most recent call. Great for Reactjs render op caching on mobile devices, etc." [f] - (let [cache_ (atom {})] + (let [cache_ (light-atom {})] (fn [& args] @(or (get @cache_ args) - (get (swap! cache_ - (fn [cache] + (get (cache_ + (fn swap-fn [cache] (if (get cache args) cache {args (delay (apply f args))}))) @@ -2887,17 +2950,6 @@ (declare top) -(defn ^:no-doc -swap-val! - "Used internally by memoization utils." - [atom_ k f] - (loop [] - (let [m0 @atom_ - v1 (f (get m0 k)) - m1 (assoc m0 k v1)] - (-if-cas! atom_ m0 m1 - v1 - (recur))))) - (defn cache "Returns a cached version of given referentially transparent function `f`. @@ -2986,8 +3038,8 @@ size ; De-raced, commands, ttl, gc, max-size (let [gc-now? gc-now? ticker (counter) - cache_ (atom nil) ; { } - latch_ (atom nil) ; Used to pause writes during gc + cache_ (light-atom nil) ; { } + latch_ (light-atom nil) ; Used to pause writes during gc ttl-ms (long (or ttl-ms 0)) ttl? (not (zero? ttl-ms)) size (long size) @@ -3001,7 +3053,7 @@ a2 (first argn)] (if (case a2 (:cache/all :mem/all) true false) (reset! cache_ nil) - (swap! cache_ dissoc argn)) + (cache_ #(dissoc % argn))) nil) (let [^long tick (ticker) ; Always inc, even on reads @@ -3014,49 +3066,53 @@ (>= (count @cache_) (* 1.1 size))) (let [latch #?(:clj (CountDownLatch. 1) :cljs nil)] - (-if-cas! latch_ nil latch - (do - ;; First prune ttl-expired stuff - (when ttl? - (swap! cache_ - (fn [m] - (persistent! - (reduce-kv - (fn [acc k ^TickedCacheEntry e] - (if (> (- instant (.-udt e)) ttl-ms) - (dissoc! acc k) - acc)) - (transient (or m {})) - m))))) - - ;; Then prune by ascending (worst) tick-sum: - (let [snapshot @cache_ - n-to-gc (- (count snapshot) size)] - - (when (>= n-to-gc (* 0.1 size)) - (let [ks-to-gc - (top n-to-gc - (fn [k] - (let [e ^TickedCacheEntry (get snapshot k)] - (+ (.-tick-lru e) (.-tick-lfu e)))) - (keys snapshot))] - - (swap! cache_ - (fn [m] - (persistent! - (reduce (fn [acc in] (dissoc! acc in)) - (transient (or m {})) ks-to-gc))))))) - - #?(:clj (reset! latch_ nil)) - #?(:clj (.countDown latch)))))) + + (when (-cas!? latch_ nil latch) + ;; First prune ttl-expired stuff + (when ttl? + (cache_ + (fn swap-fn [m] + (persistent! + (reduce-kv + (fn [acc k ^TickedCacheEntry e] + (if (> (- instant (.-udt e)) ttl-ms) + (dissoc! acc k) + acc)) + (transient (or m {})) + m))))) + + ;; Then prune by ascending (worst) tick-sum: + (let [snapshot @cache_ + n-to-gc (- (count snapshot) size)] + + (when (>= n-to-gc (* 0.1 size)) + (let [ks-to-gc + (top n-to-gc + (fn [k] + (let [e ^TickedCacheEntry (get snapshot k)] + (+ (.-tick-lru e) (.-tick-lfu e)))) + (keys snapshot))] + + (cache_ + (fn swap-fn [m] + (persistent! + (reduce (fn [acc in] (dissoc! acc in)) + (transient (or m {})) ks-to-gc))))))) + + #?(:clj + (do + (reset! latch_ nil) + (.countDown latch))) + + nil))) (let [fresh? (case a1 (:cache/fresh :mem/fresh) true false) args (if fresh? (next args) args) _ #?(:clj (when-let [l @latch_] (.await ^CountDownLatch l)) :cljs nil) ^TickedCacheEntry e - (-swap-val! cache_ args - (fn [?e] + (cache_ args + (fn swap-fn [?e] (if (or (nil? ?e) fresh? (> (- instant (.-udt ^TickedCacheEntry ?e)) ttl-ms)) (TickedCacheEntry. (delay (apply f args)) instant tick 1) @@ -3068,8 +3124,8 @@ ttl-ms ; De-raced, commands, ttl, gc (let [gc-now? gc-now? - cache_ (atom nil) ; { } - latch_ (atom nil) ; Used to pause writes during gc + cache_ (light-atom nil) ; { } + latch_ (light-atom nil) ; Used to pause writes during gc ttl-ms (long ttl-ms) gc-rate (let [gce (or gc-every 8e3)] @@ -3083,35 +3139,38 @@ a2 (first argn)] (if (case a2 (:cache/all :mem/all) true false) (reset! cache_ nil) - (swap! cache_ dissoc argn)) + (cache_ #(dissoc % argn))) nil) (let [instant (now-udt*)] (when (gc-now? gc-rate) (let [latch #?(:clj (CountDownLatch. 1) :cljs nil)] - (-if-cas! latch_ nil latch - (do - (swap! cache_ - (fn [m] - (persistent! - (reduce-kv - (fn [acc k ^SimpleCacheEntry e] - (if (> (- instant (.-udt e)) ttl-ms) - (dissoc! acc k) - acc)) - (transient (or m {})) - m)))) - - #?(:clj (reset! latch_ nil)) - #?(:clj (.countDown latch)))))) + (when (-cas!? latch_ nil latch) + (cache_ + (fn swap-fn [m] + (persistent! + (reduce-kv + (fn [acc k ^SimpleCacheEntry e] + (if (> (- instant (.-udt e)) ttl-ms) + (dissoc! acc k) + acc)) + (transient (or m {})) + m)))) + + #?(:clj + (do + (reset! latch_ nil) + (.countDown latch))) + + nil))) (let [fresh? (case a1 (:cache/fresh :mem/fresh) true false) args (if fresh? (next args) args) _ #?(:clj (when-let [l @latch_] (.await ^CountDownLatch l)) :cljs nil) ^SimpleCacheEntry e - (-swap-val! cache_ args - (fn [?e] + (cache_ args + (fn swap-fn [?e] (if (or (nil? ?e) fresh? (> (- instant (.-udt ^SimpleCacheEntry ?e)) ttl-ms)) (SimpleCacheEntry. (delay (apply f args)) instant) @@ -3214,8 +3273,8 @@ ([opts spec] (if (empty? spec) [nil (constantly nil)] - (let [latch_ (atom nil) ; Used to pause writes during gc - reqs_ (atom nil) ; { { }} + (let [latch_ (light-atom nil) ; Used to pause writes during gc + reqs_ (light-atom nil) ; { { }} spec (coerce-limit-spec spec) ; { } {:keys [req-id-fn gc-every] @@ -3233,31 +3292,34 @@ (when (and (not peek?) (gc-now? gc-rate)) (let [latch #?(:clj (CountDownLatch. 1) :cljs nil)] - (-if-cas! latch_ nil latch - (do - (swap! reqs_ - (fn [reqs] ; { } - (persistent! - (reduce-kv - (fn [acc rid entries] - (let [new-entries - (reduce-kv - (fn [acc lid ^LimitEntry e] - (if-let [^LimitSpec s (get spec lid)] - (if (>= instant (+ (.-udt0 e) (.-ms s))) - (dissoc acc lid) - acc) - (dissoc acc lid))) - entries ; {} - entries)] - (if (empty? new-entries) - (dissoc! acc rid) - (assoc! acc rid new-entries)))) - (transient (or reqs {})) - reqs)))) - - #?(:clj (reset! latch_ nil)) - #?(:clj (.countDown latch)))))) + (when (-cas!? latch_ nil latch) + (reqs_ + (fn swap-fn [reqs] ; { } + (persistent! + (reduce-kv + (fn [acc rid entries] + (let [new-entries + (reduce-kv + (fn [acc lid ^LimitEntry e] + (if-let [^LimitSpec s (get spec lid)] + (if (>= instant (+ (.-udt0 e) (.-ms s))) + (dissoc acc lid) + acc) + (dissoc acc lid))) + entries ; {} + entries)] + (if (empty? new-entries) + (dissoc! acc rid) + (assoc! acc rid new-entries)))) + (transient (or reqs {})) + reqs)))) + + #?(:clj + (do + (reset! latch_ nil) + (.countDown latch))) + + nil))) ;; Need to atomically check if all limits pass before ;; committing to any n increments: @@ -3310,7 +3372,7 @@ entries spec)] - (-if-cas! reqs_ reqs (assoc reqs rid new-entries) + (if (-cas!? reqs_ reqs (assoc reqs rid new-entries)) nil (recur)))))))))] @@ -3324,7 +3386,7 @@ (do (if (identical-kw? req-id :rl/all) (reset! reqs_ nil) - (swap! reqs_ dissoc (req-id-fn req-id))) + (reqs_ #(dissoc % (req-id-fn req-id)))) nil) :rl/peek (f1 req-id true) @@ -3420,8 +3482,8 @@ n-skip1 (- n-total n-window)] ;; (println {:n-total n-total :n-window n-window :n-skip0 n-skip0 :n-skip1 n-skip1}) - (when (< n-skip0 n-skip1) - (-if-cas! n-skip_ n-skip0 n-skip1 + (when (< n-skip0 n-skip1) + (if (-cas!? n-skip_ n-skip0 n-skip1) (when (> n-skip1 10000) ; Time to gc, amortised cost (gc-fn n-skip1)))) @@ -3432,7 +3494,7 @@ clojure.lang.IFn (invoke [this] (when-let [p @p_] @p) ; Block iff latched - (swap! ts_ (let [t1 (now-udt*)] (fn [v] (conj v t1)))) + (let [t1 (now-udt*)] (ts_ #(conj % t1))) this ; Return to allow optional deref ) @@ -3442,9 +3504,9 @@ (rc-deref msecs ts_ n-skip_ (fn gc [n-skip1] (let [p (promise)] - (-if-cas! p_ nil p ; Latch + (if (-cas!? p_ nil p) ; Latch (do - (swap! ts_ (fn [v] (subvec v n-skip1))) + (ts_ #(subvec % n-skip1)) (reset! n-skip_ 0) (reset! p_ nil) (deliver p nil)))))))) @@ -3453,7 +3515,7 @@ (deftype RollingCounter [^long msecs ts_ n-skip_] IFn (-invoke [this] - (swap! ts_ (let [t1 (now-udt*)] (fn [v] (conj v t1)))) + (let [t1 (now-udt*)] (ts_ #(conj % t1))) this ; Return to allow optional deref ) @@ -3461,7 +3523,7 @@ (-deref [_] (rc-deref msecs ts_ n-skip_ (fn gc [n-skip1] - (swap! ts_ (fn [v] (subvec v n-skip1))) + (ts_ #(subvec % n-skip1)) (reset! n-skip_ 0)))))) (defn rolling-counter @@ -3471,9 +3533,9 @@ [msecs] (RollingCounter. (long (have pos-int? msecs)) - (atom []) - (atom 0) - #?(:clj (atom nil)))) + (light-atom []) + (light-atom 0) + #?(:clj (light-atom nil)))) (comment (def myrc (rolling-counter 4000)) @@ -3497,10 +3559,10 @@ :or {gc-every 16e3}}] (let [nmax (long nmax) - acc_ (atom (vec init-val)) + acc_ (light-atom (vec init-val)) gc-every (when gc-every (long gc-every)) ticker (when gc-every (counter)) - latch_ (when gc-every (atom nil))] + latch_ (when gc-every (light-atom nil))] (fn rolling-vec-fn ([ ] @acc_) @@ -3510,16 +3572,16 @@ (let [^long tick (ticker)] (when-let [gc-now? (== (rem tick ^long gc-every) 0)] - #?(:cljs (swap! acc_ (fn [sv] (into [] sv))) + #?(:cljs (acc_ (fn swap-fn [sv] (into [] sv))) :clj (let [latch (CountDownLatch. 1)] - (when (compare-and-set! latch_ nil latch) - (swap! acc_ (fn [sv] (into [] sv))) + (when (-cas!? latch_ nil latch) + (acc_ (fn swap-fn [sv] (into [] sv))) (reset! latch_ nil) (.countDown latch))))))) - (swap! acc_ - (fn [acc] + (acc_ + (fn swap-fn [acc] (let [new (conj acc x)] (if (> (count new) nmax) (subvec new 1) @@ -5053,18 +5115,18 @@ (do (callback-fn {:?error :xhr-pool-depleted}) nil) :else - (let [done?_ (atom false)] + (let [done?_ (light-atom false)] (.setTimeout js/window (fn xhr-timeout [] - (when (compare-and-set! done?_ false true) + (when (-cas!? done?_ false true) (callback-fn {:?error :xhr-pool-timeout}))) xhr-timeout-ms) (.getObject xhr-pool (fn xhr-cb [xhr] ;; We've acquired xhr after some time - (if (compare-and-set! done?_ false true) + (if (-cas!? done?_ false true) (with-xhr xhr) (.releaseObject xhr-pool xhr)))) nil))))) @@ -5541,7 +5603,7 @@ (tf-done? [_] (not (identical-kw? @result__ -tout-pending))) (tf-pending? [_] (identical-kw? @result__ -tout-pending)) (tf-cancelled? [_] (identical-kw? @result__ -tout-cancelled)) - (tf-cancel! [_] (compare-and-set! result__ -tout-pending -tout-cancelled)) + (tf-cancel! [_] (-cas!? result__ -tout-pending -tout-cancelled)) IPending (-realized? [t] (tf-done? t)) IDeref (-deref [t] (tf-poll t)))) @@ -5556,7 +5618,7 @@ (tf-pending? [_] (identical-kw? @result__ -tout-pending)) (tf-cancelled? [_] (identical-kw? @result__ -tout-cancelled)) (tf-cancel! [_] - (if (compare-and-set! result__ -tout-pending -tout-cancelled) + (if (-cas!? result__ -tout-pending -tout-cancelled) (do (.countDown latch) true) false)) @@ -5593,12 +5655,12 @@ ([impl_ msecs f] (let [msecs (long msecs) udt (+ (now-udt*) msecs) ; Approx instant to run - result__ (atom -tout-pending) + result__ (light-atom -tout-pending) #?(:clj latch) #?(:clj (java.util.concurrent.CountDownLatch. 1)) cas-f (fn [] (let [result_ (delay (f))] - (when (compare-and-set! result__ -tout-pending result_) + (when (-cas!? result__ -tout-pending result_) @result_ #?(:clj (.countDown latch)))))] @@ -5630,6 +5692,18 @@ (when-not elide? `(do ~@body))))) (deprecated + (defn ^:no-doc -swap-val! + "Prefer `light-atom`." + {:deprecated "vX.Y.Z (YYYY-MM-DD)"} + [atom_ k f] + (loop [] + (let [m0 @atom_ + v1 (f (get m0 k)) + m1 (assoc m0 k v1)] + (if (-cas!? atom_ m0 m1) + v1 + (recur))))) + #?(:cljs (def ^:no-doc ^:deprecated regular-num? finite-num?)) #?(:cljs (def ^:no-doc ^:deprecated get-window-location get-win-loc)) #?(:clj (def ^:no-doc ^:deprecated srng secure-rng)) @@ -6031,7 +6105,14 @@ (defn ^:no-doc ^:deprecated kw-identical? "Prefer `identical-kw?` macro." #?(:cljs {:tag boolean}) - [x y] (identical-kw? x y))) + [x y] (identical-kw? x y)) + + #?(:clj + (defmacro ^:no-doc -if-cas! + "Prefer `-cas!?`." + {:deprecated "vX.Y.Z (YYYY-MM-DD)"} + [atom_ old-val new-val then & [else]] + `(if (-cas!? ~atom_ ~old-val ~new-val) ~then ~else)))) (deprecated ;; v3.66.0 (2023-08-23) - unified config API diff --git a/test/taoensso/encore_tests.cljc b/test/taoensso/encore_tests.cljc index c7977130..f8f56b7b 100644 --- a/test/taoensso/encore_tests.cljc +++ b/test/taoensso/encore_tests.cljc @@ -619,6 +619,15 @@ [(is (enc/can-meta? [])) (is (not (enc/can-meta? "foo")))]) +;;;; LightAtom + +(deftest _light-atom + [(is (= @(enc/light-atom :foo) :foo)) + (is (= (let [la (enc/light-atom 0) ] [(la inc) @la]) [1 1 ])) + (is (= (let [la (enc/light-atom {:a 0})] [(la :a inc) @la]) [1 {:a 1}])) + (is (true? (compare-and-set! (enc/light-atom 0) 0 1))) + (is (false? (compare-and-set! (enc/light-atom 1) 0 1)))]) + ;;;; Name filter (deftest _name-filter