Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge changes from nathanmarz/storm master #17

Merged
merged 45 commits into from
Nov 3, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7f10c1e
Disruptor queue metrics (#613)
Jul 20, 2013
049905b
Fixes schema for isolation.scheduler.machines: Map
Aug 21, 2013
a909a3c
Merge pull request #660 from d2r/d2r-fix-iso-mach-config-schema
nathanmarz Aug 21, 2013
46cc0fc
make FixedTupleSpout implement IRichSpout so it can be used for tests
Aug 26, 2013
85895c8
unique URLs in findAndReadConfigFile to work around apparent sbt/nsc …
Aug 26, 2013
cdbdf8d
oops
Aug 26, 2013
556b2a7
Merge pull request #664 from jaked/unique-config-files
nathanmarz Aug 27, 2013
206b42b
Merge pull request #663 from jaked/FixedTupleSpout-implements-IRichSpout
nathanmarz Aug 27, 2013
8c14994
Ignoring a FileNotFound excpetion that can bring down a supervisor.
Aug 28, 2013
21805e3
ui and log viewer send proper html pages
Aug 28, 2013
d85d83d
latest version of leiningen breaks bin/build_release.sh creating zip …
Aug 29, 2013
6f7ed1e
Merge pull request #670 from jasonjckn/lein2.3_fix
nathanmarz Aug 29, 2013
b8f4055
Moved exception handling to rmr.
Aug 29, 2013
9d0c4e6
Wrapping all curator exceptions in RuntimeExceptions.
Sep 3, 2013
70e9825
Update nimbus.clj
ankitoshniwal Sep 3, 2013
e0acf0a
Update nimbus.clj
ankitoshniwal Sep 3, 2013
98eb38b
Update nimbus.clj
ankitoshniwal Sep 3, 2013
0240261
Update nimbus.clj
ankitoshniwal Sep 4, 2013
043500c
Merge pull request #672 from ankitoshniwal/master
nathanmarz Sep 4, 2013
94f664e
Merge branch 'nmarz-master' into WrapNonRuntimeReportError
Sep 5, 2013
cffa51d
Now only wrap exceptions that are not RuntimeExceptions.
Sep 5, 2013
97a454c
Merge pull request #671 from revans2/WrapNonRuntimeReportError
nathanmarz Sep 19, 2013
fc5fbb8
Merge pull request #667 from revans2/FNF-fix
nathanmarz Sep 19, 2013
030c11e
Merge pull request #633 from infochimps-labs/metrics_disruptor_producer
nathanmarz Sep 19, 2013
da8e666
Merge pull request #669 from d2r/d2r-html-template
nathanmarz Sep 19, 2013
449e4e4
Prep for 0.9.0-rc1 release: bump version and add KEYS file for artifa…
ptgoetz Sep 24, 2013
d8dc1bd
update changelog for 0.9.0-rc1, prep for rc2
ptgoetz Sep 24, 2013
8b16230
Fix broken JAR_JVM_OPTS
Sep 24, 2013
0671cb8
Fixing STORM_JAR_JVM_OPTS
Sep 24, 2013
96a433a
Using @xumingming recommendation
Sep 26, 2013
ab08cb2
Merge pull request #683 from roadkill001/master
ptgoetz Sep 27, 2013
32098d5
Release 0.9.0-rc2
ptgoetz Sep 27, 2013
6b7c62e
bump version
ptgoetz Sep 27, 2013
0616f59
Use 'equals()' method to String comparison instead '=='
miofthena Sep 30, 2013
504b552
Merge pull request #691 from miofthena/patch-1
ptgoetz Sep 30, 2013
d0203c2
Adds macro while-timout to avoid test hangs
Oct 1, 2013
b66d028
Add in the ability to configure the number of worker threads for Nett…
Oct 1, 2013
96f2342
Merge pull request #692 from d2r/d2r-testing-while-timeout
nathanmarz Oct 7, 2013
1e3d266
Merge pull request #693 from revans2/netty-tuning
ptgoetz Oct 7, 2013
483ce45
fix the issue that tick tuple cannot work with system bolt
xumingming Oct 9, 2013
edbb17c
Do the worker HB timeout check when HB's are updated
Oct 10, 2013
213102b
Fixed negative netty sleep values.
brndnmtthws Oct 18, 2013
3985de7
Merge pull request #713 from brndnmtthws/nm
ptgoetz Oct 23, 2013
a0bc262
Merge pull request #706 from d2r/d2r-nimbus-hb-check-timeout-on-update
ptgoetz Oct 25, 2013
c3db916
Merge pull request #702 from xumingming/fix-system-bolt
ptgoetz Oct 25, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
## Unreleased (0.9.0)
## 0.9.0-rc3 (Unreleased)

## 0.9.0-rc2

* Fixed `storm jar` command to work properly when STORM_JAR_JVM_OPTS is not specified (thanks roadkill001)

## 0.9.0-rc1

* All logging now done with slf4j
* Replaced log4j logging system with logback
Expand Down
24 changes: 24 additions & 0 deletions KEYS
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
pub 2048R/E80B8FFD 2012-03-05 [expires: 2018-03-05]
uid P. Taylor Goetz <[email protected]>
sig 3 E80B8FFD 2012-03-05 P. Taylor Goetz <[email protected]>

-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG/MacGPG2 v2.0.20 (Darwin)
Comment: GPGTools - http://gpgtools.org

mQENBE9VAVUBCADwWjI9USSW4xx45L0KSeHiu+rT1t2eolKx+yxxfMC9QJWb1uGt
WCKG2zb2lk6DBej2/vF6v6EA6d+esOZfmSZazkd61q0INyimuxi0PBHEjipWD/f3
uj87ylGY6WbhQjv60eRlQLMH5Md7zGtzUQGmi7BlogTiwWvcYGvYjmkpk6AyGrE2
9VhJrtRMXpX53V1iL79Z8QR6l688oyuxV3OmPVQMJADtqbXMrDiHk+nSpVuZT5gm
CA3Fl5zfq7RdsPLrJeNDNM+sL0IuKiFX5U2RVuXF3G4BWoBoHtot8ZG01YhKP7gG
/7l2fLd5q/sytCcahT7uLTG/rIC829tFvjMvABEBAAG0I1AuIFRheWxvciBHb2V0
eiA8cHRnb2V0ekBnbWFpbC5jb20+iQE+BBMBAgAoBQJPVQFVAhsvBQkLSIaABgsJ
CAcDAgYVCAIJCgsEFgIDAQIeAQIXgAAKCRCN4Dli6AuP/bqmB/9/U1AzfpMFJ/dY
noqCY2yEYV54Bm6e59qlYUifPEFCMKULB5IzMdyou2DYoUrJquHTYdsHUBTr8cuN
4wVnro8AsryNXjo8oFmE9JwrrO6jE5GLt1OTvri+e0MYgvb08Fk54aZg/zXTcNNS
pIdkbLDBj/RL5jdflKAFuYKSsIEaj0bCvECoR1CRPfTJX2XtPDzRTP28ccRu/pEz
2I588JSZ/RSjqk9DW2Mh75g1CBocRLp90qhW9jUoCkZb0Pis8jnm5gkcHYOz5Hpr
qPzxjZOlMD+cLkP9Geo0+Gs13tt3rwBgIE0l/mPdRltPBbQ9xXORoMlGHtZlXZrn
qSx4e87y
=RfYX
-----END PGP PUBLIC KEY BLOCK-----
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.9.0-wip21
0.9.0-rc3
2 changes: 1 addition & 1 deletion bin/storm
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def jar(jarfile, klass, *args):
jvmtype="-client",
extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
args=args,
jvmopts=[JAR_JVM_OPTS + " -Dstorm.jar=" + jarfile])
jvmopts=[' '.join(filter(None, [JAR_JVM_OPTS, "-Dstorm.jar=" + jarfile]))])

def kill(*args):
"""Syntax: [storm kill topology-name [-w wait-time-secs]]
Expand Down
7 changes: 7 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ zmq.threads: 1
zmq.linger.millis: 5000
zmq.hwm: 0

storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880 #5MB buffer
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100

### topology.* configs are for specific executing storms
topology.enable.message.timeouts: true
topology.debug: false
Expand Down
1 change: 1 addition & 0 deletions storm-console-logging/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

(defproject storm/storm-console-logging VERSION
:resource-paths ["logback"]
:target-path "target"

:profiles {:release {}
}
Expand Down
1 change: 1 addition & 0 deletions storm-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
:java-source-paths ["src/jvm"]
:test-paths ["test/clj"]
:resource-paths ["../conf"]
:target-path "target"

:profiles {:dev {:resource-paths ["src/dev"]
:dependencies [[org.mockito/mockito-all "1.9.5"]]}
Expand Down
7 changes: 6 additions & 1 deletion storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
(ns backtype.storm.daemon.builtin-metrics
(:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer])
(:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer StateMetric])
(:import [backtype.storm Config])
(:use [backtype.storm.stats :only [stats-rate]]))

Expand Down Expand Up @@ -36,6 +36,11 @@
(.registerMetric topology-context (str "__" (name kw)) imetric
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))

(defn register-queue-metrics [queues storm-conf topology-context]
(doseq [[qname q] queues]
(.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))

(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
(-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
(-> m .complete-latency (.scope stream) (.update latency-ms)))
Expand Down
25 changes: 20 additions & 5 deletions storm-core/src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
(:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
(:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric])
(:import [backtype.storm Config])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
(:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))
Expand Down Expand Up @@ -282,9 +283,10 @@
receive-queue (:receive-queue executor-data)
context (:worker-context executor-data)]
(when tick-time-secs
(if (and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data)))
(log-message "Timeouts disabled for executor " (:executor-id executor-data))
(if (or (system-id? (:component-id executor-data))
(and (not (storm-conf TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS))
(= :spout (:type executor-data))))
(log-message "Timeouts disabled for executor " (:component-id executor-data) ":" (:executor-id executor-data))
(schedule-recurring
(:user-timer worker)
tick-time-secs
Expand Down Expand Up @@ -493,6 +495,10 @@
(or out-tasks [])
))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive receive-queue}
storm-conf (:user-context task-data))

(.open spout-obj
storm-conf
(:user-context task-data)
Expand Down Expand Up @@ -591,7 +597,6 @@
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow


;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
(let [stream-id (.getSourceStreamId tuple)]
Expand Down Expand Up @@ -656,6 +661,16 @@
(MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)
:transfer (:transfer-queue (:worker executor-data))}
storm-conf user-context)
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)}
storm-conf user-context)
)

(.prepare bolt-obj
storm-conf
user-context
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/logviewer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
(str "effective log level for " name " is " (.getLevel (.getLogger log)))))

(defn log-template [body]
(html
(html4
[:head
[:title "Storm log viewer"]
(include-css "/css/bootstrap-1.1.0.css")
Expand Down
25 changes: 15 additions & 10 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@
;; Does not assume that clocks are synchronized. Executor heartbeat is only used so that
;; nimbus knows when it's received a new heartbeat. All timing is done by nimbus and
;; tracked through heartbeat-cache
(defn- update-executor-cache [curr hb]
(defn- update-executor-cache [curr hb timeout]
(let [reported-time (:time-secs hb)
{last-nimbus-time :nimbus-time
last-reported-time :executor-reported-time} curr
Expand All @@ -338,15 +338,18 @@
(current-time-secs)
last-nimbus-time
)]
{:nimbus-time nimbus-time
{:is-timed-out (and
nimbus-time
(>= (time-delta nimbus-time) timeout))
:nimbus-time nimbus-time
:executor-reported-time reported-time}))

(defn update-heartbeat-cache [cache executor-beats all-executors]
(defn update-heartbeat-cache [cache executor-beats all-executors timeout]
(let [cache (select-keys cache all-executors)]
(into {}
(for [executor all-executors :let [curr (cache executor)]]
[executor
(update-executor-cache curr (get executor-beats executor))]
(update-executor-cache curr (get executor-beats executor) timeout)]
))))

(defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
Expand All @@ -355,7 +358,8 @@
executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment))
cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
executor-beats
all-executors)]
all-executors
((:conf nimbus) NIMBUS-TASK-TIMEOUT-SECS))]
(swap! (:heartbeats-cache nimbus) assoc storm-id cache)))

(defn- update-all-heartbeats! [nimbus existing-assignments topology->executors]
Expand All @@ -380,14 +384,12 @@
(->> all-executors
(filter (fn [executor]
(let [start-time (get executor-start-times executor)
nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
is-timed-out (-> heartbeats-cache (get executor) :is-timed-out)]
(if (and start-time
(or
(< (time-delta start-time)
(conf NIMBUS-TASK-LAUNCH-SECS))
(not nimbus-time)
(< (time-delta nimbus-time)
(conf NIMBUS-TASK-TIMEOUT-SECS))
(not is-timed-out)
))
true
(do
Expand Down Expand Up @@ -855,7 +857,10 @@
(defn validate-topology-name! [name]
(if (some #(.contains name %) DISALLOWED-TOPOLOGY-NAME-STRS)
(throw (InvalidTopologyException.
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))))
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))
(if (clojure.string/blank? name)
(throw (InvalidTopologyException.
("Topology name cannot be blank"))))))

(defn- try-read-storm-conf [conf storm-id]
(try-cause
Expand Down
16 changes: 12 additions & 4 deletions storm-core/src/clj/backtype/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@
(rmr t)
))

(def TEST-TIMEOUT-MS 5000)

(defmacro while-timeout [timeout-ms condition & body]
`(let [end-time# (+ (System/currentTimeMillis) ~timeout-ms)]
(while ~condition
(when (> (System/currentTimeMillis) end-time#)
(throw (AssertionError. (str "Test timed out (" ~timeout-ms "ms)"))))
~@body)))

(defn wait-until-cluster-waiting
"Wait until the cluster is idle. Should be used with time simulation."
Expand All @@ -176,7 +184,7 @@
supervisors
workers) ; because a worker may already be dead
]
(while (not (every? (memfn waiting?) daemons))
(while-timeout TEST-TIMEOUT-MS (not (every? (memfn waiting?) daemons))
(Thread/sleep 10)
;; (doseq [d daemons]
;; (if-not ((memfn waiting?) d)
Expand Down Expand Up @@ -443,11 +451,11 @@


(let [storm-id (common/get-storm-id state storm-name)]
(while (not (every? exhausted? (spout-objects spouts)))
(while-timeout TEST-TIMEOUT-MS (not (every? exhausted? (spout-objects spouts)))
(simulate-wait cluster-map))

(.killTopologyWithOpts (:nimbus cluster-map) storm-name (doto (KillOptions.) (.set_wait_secs 0)))
(while (.assignment-info state storm-id nil)
(while-timeout TEST-TIMEOUT-MS (.assignment-info state storm-id nil)
(simulate-wait cluster-map))
(when cleanup-state
(doseq [spout (spout-objects spouts)]
Expand Down Expand Up @@ -554,7 +562,7 @@
(not= (global-amt track-id "transferred")
(global-amt track-id "processed"))
))]
(while (waiting?)
(while-timeout TEST-TIMEOUT-MS (waiting?)
;; (println "Spout emitted: " (global-amt track-id "spout-emitted"))
;; (println "Processed: " (global-amt track-id "processed"))
;; (println "Transferred: " (global-amt track-id "transferred"))
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
:onclick "toggleSys()"}]])

(defn ui-template [body]
(html
(html4
[:head
[:title "Storm UI"]
(include-css "/css/bootstrap-1.1.0.css")
Expand Down
13 changes: 11 additions & 2 deletions storm-core/src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns backtype.storm.util
(:import [java.net InetAddress])
(:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
(:import [java.io FileReader])
(:import [java.io FileReader FileNotFoundException])
(:import [backtype.storm Config])
(:import [backtype.storm.utils Time Container ClojureTimerTask Utils
MutableObject MutableInt])
Expand All @@ -22,6 +22,13 @@
(:use [backtype.storm log])
)

(defn wrap-in-runtime
"Wraps an exception in a RuntimeException if needed"
[^Exception e]
(if (instance? RuntimeException e)
e
(RuntimeException. e)))

(defmacro defalias
"Defines an alias for a var: a new var with the same root binding (if
any) and similar metadata. The metadata of the alias is its initial
Expand Down Expand Up @@ -431,7 +438,9 @@
(defn rmr [path]
(log-debug "Rmr path " path)
(when (exists-file? path)
(FileUtils/forceDelete (File. path))))
(try
(FileUtils/forceDelete (File. path))
(catch FileNotFoundException e))))

(defn rmpath
"Removes file or directory at the path. Not recursive. Throws exception on failure"
Expand Down
31 changes: 20 additions & 11 deletions storm-core/src/clj/backtype/storm/zookeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,25 @@

(defn create-node
([^CuratorFramework zk ^String path ^bytes data mode]
(.. zk (create) (withMode (zk-create-modes mode)) (withACL ZooDefs$Ids/OPEN_ACL_UNSAFE) (forPath (normalize-path path) data)))
(try
(.. zk (create) (withMode (zk-create-modes mode)) (withACL ZooDefs$Ids/OPEN_ACL_UNSAFE) (forPath (normalize-path path) data))
(catch Exception e (throw (wrap-in-runtime e)))))
([^CuratorFramework zk ^String path ^bytes data]
(create-node zk path data :persistent)))

(defn exists-node? [^CuratorFramework zk ^String path watch?]
((complement nil?)
(if watch?
(.. zk (checkExists) (watched) (forPath (normalize-path path)))
(.. zk (checkExists) (forPath (normalize-path path))))))
(try
(if watch?
(.. zk (checkExists) (watched) (forPath (normalize-path path)))
(.. zk (checkExists) (forPath (normalize-path path))))
(catch Exception e (throw (wrap-in-runtime e))))))

(defnk delete-node [^CuratorFramework zk ^String path :force false]
(try-cause (.. zk (delete) (forPath (normalize-path path)))
(catch KeeperException$NoNodeException e
(when-not force (throw e))
)))
(when-not force (throw e)))
(catch Exception e (throw (wrap-in-runtime e)))))

(defn mkdirs [^CuratorFramework zk ^String path]
(let [path (normalize-path path)]
Expand All @@ -103,15 +107,20 @@
(.. zk (getData) (forPath path))))
(catch KeeperException$NoNodeException e
;; this is fine b/c we still have a watch from the successful exists call
nil ))))
nil )
(catch Exception e (throw (wrap-in-runtime e))))))

(defn get-children [^CuratorFramework zk ^String path watch?]
(if watch?
(.. zk (getChildren) (watched) (forPath (normalize-path path)))
(.. zk (getChildren) (forPath (normalize-path path)))))
(try
(if watch?
(.. zk (getChildren) (watched) (forPath (normalize-path path)))
(.. zk (getChildren) (forPath (normalize-path path))))
(catch Exception e (throw (wrap-in-runtime e)))))

(defn set-data [^CuratorFramework zk ^String path ^bytes data]
(.. zk (setData) (forPath (normalize-path path) data)))
(try
(.. zk (setData) (forPath (normalize-path path) data))
(catch Exception e (throw (wrap-in-runtime e)))))

(defn exists [^CuratorFramework zk ^String path watch?]
(exists-node? zk path watch?))
Expand Down
Loading