From fd67fe75416a30d52991e5ffa80e311d7112cc57 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Mon, 10 Apr 2017 10:37:00 -0700 Subject: [PATCH 1/6] Hackweek project to get Trident working in Heron --- WORKSPACE | 12 + .../java/com/twitter/heron/api/Config.java | 2 +- .../com/twitter/heron/api/HeronTopology.java | 4 +- .../api/topology/BaseComponentDeclarer.java | 24 +- .../heron/api/topology/BoltDeclarer.java | 7 +- heron/examples/src/java/BUILD | 1 + .../examples/TridentWordCountTopology.java | 88 ++ heron/executor/src/python/heron_executor.py | 6 +- .../bolt/BoltOutputCollectorImpl.java | 11 +- .../heron/network/StreamManagerClient.java | 9 +- heron/stmgr/src/cpp/grouping/grouping.cpp | 6 +- heron/storm/src/java/BUILD | 7 + .../src/java/org/apache/storm/Config.java | 58 ++ .../storm/generated/GlobalStreamId.java | 29 +- .../storm/task/GeneralTopologyContext.java | 64 +- .../apache/storm/task/TopologyContext.java | 34 +- .../storm/topology/BoltDeclarerImpl.java | 61 ++ .../ComponentConfigurationDeclarer.java | 3 +- .../apache/storm/topology/InputDeclarer.java | 8 +- .../storm/topology/ResourceDeclarer.java | 30 + .../storm/topology/SpoutDeclarerImpl.java | 15 + .../java/org/apache/storm/trident/README.md | 49 ++ .../topology/MasterBatchCoordinator.java | 289 +++++++ .../trident/topology/TridentBoltExecutor.java | 442 ++++++++++ .../topology/TridentTopologyBuilder.java | 756 ++++++++++++++++++ .../java/org/apache/storm/utils/Utils.java | 302 +++++++ 26 files changed, 2284 insertions(+), 33 deletions(-) create mode 100644 heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java create mode 100644 heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java create mode 100644 heron/storm/src/java/org/apache/storm/trident/README.md create mode 100644 heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java create mode 100644 heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java create mode 100644 heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java diff --git a/WORKSPACE b/WORKSPACE index 01431a9d9e9..cdf7568728c 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -276,6 +276,18 @@ maven_jar( artifact = "org.apache.reef:tang:" + reef_version ) +# added for trident prototype +maven_jar( + name = "org_clojure_clojure", + artifact = "org.clojure:clojure:1.7.0" +) + +# added for trident prototype +maven_jar( + name = "org_apache_storm_core", + artifact = "org.apache.storm:storm-core:1.0.0" +) + maven_jar( name = "org_apache_thrift_libthrift", artifact = "org.apache.thrift:libthrift:0.5.0-1", diff --git a/heron/api/src/java/com/twitter/heron/api/Config.java b/heron/api/src/java/com/twitter/heron/api/Config.java index 23af1ee256a..fcef51c84c2 100644 --- a/heron/api/src/java/com/twitter/heron/api/Config.java +++ b/heron/api/src/java/com/twitter/heron/api/Config.java @@ -506,7 +506,7 @@ public void setComponentJvmOptions(String component, String jvmOptions) { setComponentJvmOptions(this, component, jvmOptions); } - public Set getApiVars() { + public static Set getApiVars() { return apiVars; } } diff --git a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java index 0699d4f99e4..9cdb04b40aa 100644 --- a/heron/api/src/java/com/twitter/heron/api/HeronTopology.java +++ b/heron/api/src/java/com/twitter/heron/api/HeronTopology.java @@ -37,9 +37,11 @@ public HeronTopology(TopologyAPI.Topology.Builder topologyBuilder) { this.topologyBuilder = topologyBuilder; } + // TODO: share this logic with BaseComponentDeclarer.dump(), which is identical. Differences + // between the two have caused hard to troubleshoot bugs. private static TopologyAPI.Config.Builder getConfigBuilder(Config config) { TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder(); - Set apiVars = config.getApiVars(); + Set apiVars = Config.getApiVars(); for (String key : config.keySet()) { Object value = config.get(key); TopologyAPI.Config.KeyValue.Builder b = TopologyAPI.Config.KeyValue.newBuilder(); diff --git a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java index aa1059acaf8..430f094f2d9 100644 --- a/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java +++ b/heron/api/src/java/com/twitter/heron/api/topology/BaseComponentDeclarer.java @@ -16,6 +16,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.logging.Logger; import com.google.protobuf.ByteString; @@ -64,20 +65,29 @@ public void dump(TopologyAPI.Component.Builder bldr) { bldr.setSpec(TopologyAPI.ComponentObjectSpec.JAVA_SERIALIZED_OBJECT); bldr.setSerializedObject(ByteString.copyFrom(Utils.serialize(component))); + // TODO: share this logic with HeronTopology.getConfigBuilder, which is identical. Differences + // between the two have caused hard to troubleshoot bugs. + Set apiVars = Config.getApiVars(); TopologyAPI.Config.Builder cBldr = TopologyAPI.Config.newBuilder(); - for (Map.Entry entry : componentConfiguration.entrySet()) { - if (entry.getKey() == null) { + for (String key : componentConfiguration.keySet()) { + Object value = componentConfiguration.get(key); + if (key == null) { LOG.warning("ignore: config key is null"); continue; } - if (entry.getValue() == null) { - LOG.warning("ignore: config key " + entry.getKey() + " has null value"); + if (value == null) { + LOG.warning("ignore: config key " + key + " has null value"); continue; } TopologyAPI.Config.KeyValue.Builder kvBldr = TopologyAPI.Config.KeyValue.newBuilder(); - kvBldr.setKey(entry.getKey()); - kvBldr.setValue(entry.getValue().toString()); - kvBldr.setType(TopologyAPI.ConfigValueType.STRING_VALUE); + kvBldr.setKey(key); + if (apiVars.contains(key)) { + kvBldr.setType(TopologyAPI.ConfigValueType.STRING_VALUE); + kvBldr.setValue(value.toString()); + } else { + kvBldr.setType(TopologyAPI.ConfigValueType.JAVA_SERIALIZED_VALUE); + kvBldr.setSerializedValue(ByteString.copyFrom(Utils.serialize(value))); + } cBldr.addKvs(kvBldr); } bldr.setConfig(cBldr); diff --git a/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java b/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java index 2d786b199d7..9e00d613304 100644 --- a/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java +++ b/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java @@ -151,8 +151,11 @@ public BoltDeclarer directGrouping(String componentName) { } public BoltDeclarer directGrouping(String componentName, String streamId) { - // TODO:- revisit this - throw new RuntimeException("direct Grouping not implemented"); + TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder(); + bldr.setStream( + TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName)); + bldr.setGtype(TopologyAPI.Grouping.DIRECT); + return grouping(bldr); } public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouping) { diff --git a/heron/examples/src/java/BUILD b/heron/examples/src/java/BUILD index 9b5fad5280d..2becdc632f6 100644 --- a/heron/examples/src/java/BUILD +++ b/heron/examples/src/java/BUILD @@ -7,6 +7,7 @@ java_binary( "//heron/api/src/java:api-java", "//heron/common/src/java:basics-java", "//heron/storm/src/java:storm-compatibility-java", + "@org_apache_storm_core//jar", ], create_executable = 0, ) diff --git a/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java b/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java new file mode 100644 index 00000000000..a95f32b45e1 --- /dev/null +++ b/heron/examples/src/java/com/twitter/heron/examples/TridentWordCountTopology.java @@ -0,0 +1,88 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License +package com.twitter.heron.examples; + +import java.util.Arrays; +import java.util.Collections; + +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.trident.TridentState; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.builtin.Count; +import org.apache.storm.trident.testing.FixedBatchSpout; +import org.apache.storm.trident.testing.MemoryMapState; +import org.apache.storm.trident.testing.Split; +import org.apache.storm.trident.topology.TridentTopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import org.apache.storm.StormSubmitter; + +/** + * + */ +public class TridentWordCountTopology { + + public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { + if (args.length < 1) { + throw new RuntimeException("Specify topology name"); + } + + int parallelism = 1; + if (args.length > 1) { + parallelism = Integer.parseInt(args[1]); + } + + @SuppressWarnings("unchecked") + FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, + new Values("the cow jumped over the moon"), + new Values("the man went to the store and bought some candy"), + new Values("four score and seven years ago"), + new Values("how many apples can you eat")); + spout.setCycle(true); + + // This spout cycles through that set of sentences over and over to produce the sentence stream. + // Here's the code to do the streaming word count part of the computation: + TridentTopology topology = new TridentTopology(); + TridentState wordCounts = + topology.newStream("spout1", spout) + .each(new Fields("sentence"), new Split(), new Fields("word")) + .groupBy(new Fields("word")) + .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) + .parallelismHint(parallelism); + + Config conf = new Config(); + conf.setDebug(true); + + // TODO: for some reason this is automatically added to spouts but not bolts... + conf.put(Config.TOPOLOGY_KRYO_REGISTER, Collections.singletonList( + "org.apache.storm.trident.topology.TransactionAttempt")); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Collections.singletonList("localhost")); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); + conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/transaction_root"); + conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 5000); + conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 5000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 5); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5000); + conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 5000); + //Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS + conf.put("topology.trident.batch.emit.interval.millis", 5000); + + StormTopology stormTopology = topology.build(); + StormSubmitter.submitTopology(args[0], conf, stormTopology); + } +} diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index b8e4ce39d0e..e71b6f6fee0 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -454,7 +454,11 @@ def _get_java_instance_cmd(self, instance_info): '-XX:+HeapDumpOnOutOfMemoryError', '-XX:+UseConcMarkSweepGC', '-XX:ParallelGCThreads=4', - '-Xloggc:log-files/gc.%s.log' % instance_id] + '-Xloggc:log-files/gc.%s.log' % instance_id.replace("$", "")] + if global_task_id == -1: # Used to enable debugging of specific instances + instance_cmd =\ + instance_cmd + ["-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"] + instance_cmd = instance_cmd + self.instance_jvm_opts.split() if component_name in self.component_jvm_opts: instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split() diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java index 10a61e94360..30604df6c1f 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java +++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java @@ -14,6 +14,7 @@ package com.twitter.heron.instance.bolt; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -108,7 +109,9 @@ public void emitDirect( String streamId, Collection anchors, List tuple) { - throw new RuntimeException("emitDirect not supported"); + LOG.warning(String.format("emitDirect not supported so emitting to stream %s as part of " + + "a prototype. This topology is likely broken.", streamId)); + emit(streamId, anchors, tuple); } @Override @@ -169,7 +172,7 @@ private List admitBoltTuple( // set the key. This is mostly ignored bldr.setKey(0); - List customGroupingTargetTaskIds = null; + List customGroupingTargetTaskIds = new ArrayList<>(); if (!helper.isCustomGroupingEmpty()) { // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping customGroupingTargetTaskIds = @@ -218,8 +221,8 @@ private List admitBoltTuple( // Update metrics boltMetrics.emittedTuple(streamId); - // TODO:- remove this after changing the api - return null; + // TODO: this used to return null. modified to make Trident work. Verify this is correct + return customGroupingTargetTaskIds; } private void admitAckTuple(Tuple tuple) { diff --git a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java index 3072be1ba85..945706beba6 100644 --- a/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java +++ b/heron/instance/src/java/com/twitter/heron/network/StreamManagerClient.java @@ -239,6 +239,8 @@ private void sendStreamMessageIfNeeded() { } } + private long lastNotConnectedLogTime = 0; + private long throttleSeconds = 5; private void readStreamMessageIfNeeded() { // If client is not connected, just return if (isConnected()) { @@ -249,7 +251,12 @@ private void readStreamMessageIfNeeded() { stopReading(); } } else { - LOG.info("Stop reading due to not yet connected to Stream Manager."); + long now = System.currentTimeMillis(); + if (now - lastNotConnectedLogTime > throttleSeconds * 5000) { + LOG.info(String.format("Stop reading due to not yet connected to Stream Manager. This " + + "message is throttled to emit no more than once every %d seconds.", throttleSeconds)); + lastNotConnectedLogTime = now; + } } } diff --git a/heron/stmgr/src/cpp/grouping/grouping.cpp b/heron/stmgr/src/cpp/grouping/grouping.cpp index b84f32f7f73..c90a252cfd4 100644 --- a/heron/stmgr/src/cpp/grouping/grouping.cpp +++ b/heron/stmgr/src/cpp/grouping/grouping.cpp @@ -68,8 +68,10 @@ Grouping* Grouping::Create(proto::api::Grouping grouping_, const proto::api::Inp } case proto::api::DIRECT: { - LOG(FATAL) << "Direct grouping not supported"; - return NULL; // keep compiler happy + // TODO: do not merge in this state! + LOG(ERROR) << "Direct grouping not supported, faking with ShuffleGrouping for now"; +// return NULL; // keep compiler happy + return new ShuffleGrouping(_task_ids); break; } diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD index 93f6526a67c..6e3ba022c68 100644 --- a/heron/storm/src/java/BUILD +++ b/heron/storm/src/java/BUILD @@ -5,6 +5,13 @@ storm_deps_files = [ "//heron/common/src/java:basics-java", "//heron/simulator/src/java:simulator-java", "@com_googlecode_json_simple_json_simple//jar", + "//heron/proto:proto_topology_java", # added for trident prototype + "@commons_io_commons_io//jar", # added for trident prototype + "@commons_lang_commons_lang//jar", # added for trident prototype + "@org_apache_storm_core//jar", # added for trident prototype + "@org_clojure_clojure//jar", # added for trident prototype + "@org_yaml_snakeyaml//jar", # added for trident prototype + "@org_ow2_asm_asm_all//jar", # added for trident prototype, required at runtime "//third_party/java:kryo", ] diff --git a/heron/storm/src/java/org/apache/storm/Config.java b/heron/storm/src/java/org/apache/storm/Config.java index 61bce1dff80..b7c3ab98dae 100644 --- a/heron/storm/src/java/org/apache/storm/Config.java +++ b/heron/storm/src/java/org/apache/storm/Config.java @@ -27,6 +27,7 @@ import org.apache.storm.serialization.IKryoDecorator; import org.apache.storm.serialization.IKryoFactory; +import org.apache.storm.validation.ConfigValidationAnnotations; /** * Topology configs are specified as a plain old map. This class provides a @@ -269,6 +270,63 @@ public class Config extends com.twitter.heron.api.Config { */ public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port"; + /** + * The ceiling of the interval between retries of a Zookeeper operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis"; + + /** + * A list of hosts of Exhibitor servers used to discover/maintain connection to ZooKeeper cluster. + * Any configured ZooKeeper servers will be used for the curator/exhibitor backup connection string. + */ + @ConfigValidationAnnotations.isStringList + public static final String STORM_EXHIBITOR_SERVERS = "storm.exhibitor.servers"; + + /** + * The port Storm will use to connect to each of the exhibitor servers. + */ + @ConfigValidationAnnotations.isInteger + @ConfigValidationAnnotations.isPositiveNumber + public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port"; + + /* + * How often to poll Exhibitor cluster in millis. + */ + @ConfigValidationAnnotations.isString + public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath"; + + /** + * How often to poll Exhibitor cluster in millis. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_POLL="storm.exhibitor.poll.millis"; + + /** + * The number of times to retry an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times"; + + /** + * The interval between retries of an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_INTERVAL="storm.exhibitor.retry.interval"; + + /** + * The ceiling of the interval between retries of an Exhibitor operation. + */ + @ConfigValidationAnnotations.isInteger + public static final String STORM_EXHIBITOR_RETRY_INTERVAL_CEILING="storm.exhibitor.retry.intervalceiling.millis"; + + /** + * How often a batch can be emitted in a Trident topology. + */ + @ConfigValidationAnnotations.isInteger + @ConfigValidationAnnotations.isPositiveNumber + public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis"; + /** * The root directory in ZooKeeper for metadata about TransactionalSpouts. */ diff --git a/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java b/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java index 8d167329518..49770723a6b 100644 --- a/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java +++ b/heron/storm/src/java/org/apache/storm/generated/GlobalStreamId.java @@ -18,7 +18,10 @@ package org.apache.storm.generated; -public class GlobalStreamId { +import java.io.Serializable; + +public class GlobalStreamId implements Serializable { + private static final long serialVersionUID = 1873909238460677921L; private String componentId; // required private String streamId; // required @@ -60,4 +63,28 @@ public void set_streamId(String newStreamId) { public void unset_streamId() { this.streamId = null; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + GlobalStreamId that = (GlobalStreamId) o; + + if (componentId != null ? !componentId.equals(that.componentId) : that.componentId != null) + return false; + return streamId != null ? streamId.equals(that.streamId) : that.streamId == null; + } + + @Override + public int hashCode() { + int result = componentId != null ? componentId.hashCode() : 0; + result = 31 * result + (streamId != null ? streamId.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "GlobalStreamId{componentId='" + componentId + "', streamId='" + streamId + "'}"; + } } diff --git a/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java b/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java index b674200fb58..f180020a4bd 100644 --- a/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java +++ b/heron/storm/src/java/org/apache/storm/task/GeneralTopologyContext.java @@ -18,14 +18,20 @@ package org.apache.storm.task; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.tuple.Fields; import org.json.simple.JSONAware; +import com.twitter.heron.api.generated.TopologyAPI; + // import org.apache.storm.generated.ComponentCommon; // import org.apache.storm.generated.GlobalStreamId; // import org.apache.storm.generated.Grouping; @@ -104,10 +110,9 @@ public Fields getComponentOutputFields(String componentId, String streamId) { /** * Gets the declared output fields for the specified global stream id. */ - /* public Fields getComponentOutputFields(GlobalStreamId id) { + return getComponentOutputFields(id.get_componentId(), id.get_streamId()); } - */ /** * Gets the declared inputs to the specified component. @@ -120,6 +125,61 @@ public Map getSources(String componentId) { } */ + // TODO: this is total jank + public Map getSources(String componentId) { + Map heron = delegate.getSources(componentId); + Map converted = new NoValueMap<>(); + for (TopologyAPI.StreamId heronStreamId : heron.keySet()) { + converted.put(new GlobalStreamId(heronStreamId.getComponentName(), heronStreamId.getId()), null); + } + return converted; + } + + // TODO: this is total jank + private class NoValueMap extends HashMap { + private static final long serialVersionUID = -729425631561874300L; + + @Override + public Collection values() { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + + @Override + public Set> entrySet() { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + + @Override + public V get(Object key) { + throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset"); + } + } + + /** + * Gets information about who is consuming the outputs of the specified component, + * and how. + * + * @return Map from stream id to component id to the Grouping used. + */ +// public Map> getHeronTargets(String componentId) { +// return delegate.getTargets(componentId); +// } + + public Map> getTargets(String componentId) { + Map> heron = delegate.getTargets(componentId); + Map> converted = new HashMap<>(); + for (String streamId : heron.keySet()) { + // TODO: this is total jank + Map heronGrouping = heron.get(streamId); + HashMap groupingConverted = new NoValueMap<>(); + for (String key : heronGrouping.keySet()) { + groupingConverted.put(key, null); + } + converted.put(streamId, groupingConverted); + } + return converted; + } + /** * Gets information about who is consuming the outputs of the specified component, * and how. diff --git a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java index fb4da90fb4f..b88315cca1e 100644 --- a/heron/storm/src/java/org/apache/storm/task/TopologyContext.java +++ b/heron/storm/src/java/org/apache/storm/task/TopologyContext.java @@ -22,10 +22,13 @@ // import org.apache.storm.generated.Grouping; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.hooks.ITaskHookDelegate; @@ -51,6 +54,7 @@ */ public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private com.twitter.heron.api.topology.TopologyContext delegate; + private Map _executorData; // Constructor to match the signature of the storm's TopologyContext // Note that here, we fake the clojure.lang.Atom by creating our own class @@ -67,11 +71,13 @@ public TopologyContext(StormTopology topology, Map stormConf, Map executorData, Map registeredMetrics, org.apache.storm.clojure.lang.Atom openOrPrepareWasCalled) { super((com.twitter.heron.api.topology.TopologyContext) null); + this._executorData = executorData; } public TopologyContext(com.twitter.heron.api.topology.TopologyContext delegate) { super(delegate); this.delegate = delegate; + this._executorData = new HashMap<>(); } /** @@ -188,16 +194,28 @@ public Map getThisSources() { } */ + public Map getThisSources() { + return getSources(getThisComponentId()); + } + /* * Gets information about who is consuming the outputs of this component, and how. * * @return Map from stream id to component id to the Grouping used. */ +// public Map> getThisHeronTargets() { +// return getHeronTargets(getThisComponentId()); +// } + /* + * Gets information about who is consuming the outputs of this component, and how. + * + * @return Map from stream id to component id to the Grouping used. + */ public Map> getThisTargets() { return getTargets(getThisComponentId()); } - */ + public void setTaskData(String name, Object data) { delegate.setTaskData(name, data); } @@ -206,15 +224,13 @@ public Object getTaskData(String name) { return delegate.getTaskData(name); } - /* - public void setExecutorData(String name, Object data) { - _executorData.put(name, data); - } + public void setExecutorData(String name, Object data) { + _executorData.put(name, data); + } - public Object getExecutorData(String name) { - return _executorData.get(name); - } - */ + public Object getExecutorData(String name) { + return _executorData.get(name); + } public void addTaskHook(ITaskHook newHook) { Collection hooks = delegate.getHooks(); diff --git a/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java b/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java index f4aaa65edbc..b42e2691b69 100644 --- a/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java +++ b/heron/storm/src/java/org/apache/storm/topology/BoltDeclarerImpl.java @@ -19,13 +19,18 @@ package org.apache.storm.topology; import java.util.Map; +import java.util.logging.Logger; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.grouping.CustomStreamGroupingDelegate; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; public class BoltDeclarerImpl implements BoltDeclarer { + private static final Logger LOG = Logger.getLogger(BoltDeclarerImpl.class.getName()); + private com.twitter.heron.api.topology.BoltDeclarer delegate; public BoltDeclarerImpl(com.twitter.heron.api.topology.BoltDeclarer delegate) { @@ -157,4 +162,60 @@ public BoltDeclarer customGrouping( delegate.customGrouping(componentId, streamId, new CustomStreamGroupingDelegate(grouping)); return this; } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + throw new RuntimeException("partialKeyGrouping not supported"); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + throw new RuntimeException("partialKeyGrouping not supported"); + } + + @Override + public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) { + switch (grouping.getSetField()) { + case ALL: + return allGrouping(id.get_componentId(), id.get_streamId()); + case DIRECT: + return directGrouping(id.get_componentId(), id.get_streamId()); + case FIELDS: + return fieldsGrouping(id.get_componentId(), id.get_streamId(), new Fields(grouping.get_fields())); + case LOCAL_OR_SHUFFLE: + return localOrShuffleGrouping(id.get_componentId(), id.get_streamId()); + case SHUFFLE: + return shuffleGrouping(id.get_componentId(), id.get_streamId()); + case NONE: + return noneGrouping(id.get_componentId(), id.get_streamId()); + case CUSTOM_SERIALIZED: + grouping.get_custom_serialized(); + LOG.warning(String.format( + "%s.grouping(GlobalStreamId id, Grouping grouping) not supported for %s, swapping in " + + "noneGrouping. The tuple stream routing will be broken for streamId %s", + getClass().getName(), grouping.getSetField(), id)); + return noneGrouping(id.get_componentId(), id.get_streamId()); + case CUSTOM_OBJECT: + //grouping.get_custom_object(); + default: + throw new RuntimeException( + "grouping(GlobalStreamId id, Grouping grouping) not supported for " + + grouping.getSetField()); + } + } + + @Override + public BoltDeclarer setMemoryLoad(Number onHeap) { + return null; + } + + @Override + public BoltDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + return null; + } + + @Override + public BoltDeclarer setCPULoad(Number amount) { + return null; + } } diff --git a/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java index 93eeb27a3e4..a149024ea1c 100644 --- a/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java +++ b/heron/storm/src/java/org/apache/storm/topology/ComponentConfigurationDeclarer.java @@ -21,7 +21,8 @@ import java.util.Map; @SuppressWarnings("rawtypes") -public interface ComponentConfigurationDeclarer { +public interface ComponentConfigurationDeclarer + extends ResourceDeclarer { T addConfigurations(Map conf); T addConfiguration(String config, Object value); diff --git a/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java index ca781a2b7a8..0a766fed99e 100644 --- a/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java +++ b/heron/storm/src/java/org/apache/storm/topology/InputDeclarer.java @@ -18,6 +18,8 @@ package org.apache.storm.topology; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.tuple.Fields; @@ -57,5 +59,9 @@ public interface InputDeclarer { T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping); - // T grouping(GlobalStreamId id, Grouping grouping); + T partialKeyGrouping(String componentId, Fields fields); + + T partialKeyGrouping(String componentId, String streamId, Fields fields); + + T grouping(GlobalStreamId id, Grouping grouping); } diff --git a/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java b/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java new file mode 100644 index 00000000000..37dc917849c --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/topology/ResourceDeclarer.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.topology; + +/** + * This is a new base interface that can be used by anything that wants to mirror + * RAS's basic API. Trident uses this to allow setting resources in the Stream API. + */ +@SuppressWarnings("rawtypes") +public interface ResourceDeclarer { + T setMemoryLoad(Number onHeap); + T setMemoryLoad(Number onHeap, Number offHeap); + T setCPULoad(Number amount); +} + diff --git a/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java b/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java index 28dcc901889..29aa08fa2d9 100644 --- a/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java +++ b/heron/storm/src/java/org/apache/storm/topology/SpoutDeclarerImpl.java @@ -63,4 +63,19 @@ public SpoutDeclarer setNumTasks(Number val) { // Heron does not support this return this; } + + @Override + public SpoutDeclarer setMemoryLoad(Number onHeap) { + return null; + } + + @Override + public SpoutDeclarer setMemoryLoad(Number onHeap, Number offHeap) { + return null; + } + + @Override + public SpoutDeclarer setCPULoad(Number amount) { + return null; + } } diff --git a/heron/storm/src/java/org/apache/storm/trident/README.md b/heron/storm/src/java/org/apache/storm/trident/README.md new file mode 100644 index 00000000000..6b3e784343a --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/README.md @@ -0,0 +1,49 @@ +Hackweek prototype to get a sample trident topology working. + +tl;dr; Heron does not support direct grouping, which Trident requires. To get Trident to work, we +need to either support direct grouping in Heron, or refactor Trident to not require it (if that's +possible). + +To run: +`~/bin/heron kill local TridentWordCountTopology && rm -rf ~/.herondata/* +bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client-install.sh --user && \ + ~/bin/heron submit local ~/.heron/examples/heron-examples.jar \ + com.twitter.heron.examples.TridentWordCountTopology TridentWordCountTopology +less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/container_1_b-1_4.log.0 +` + +Current status: +- Topology compiles and can be submitted +- DAG and streams appears to be correctly represented +- Tuple routing is incorrect due to hacks (see below) +- Trident/Storm do not provide reasonable defaults to configs and instances fails violently when + expected configs are not set. See TridentWordCountTopology. +- Many methods have been added/hacked to get the topology to run, but +- Failures appear in the counters for bolt b-1, but the logs don't show anything and +- Correctness is not right due to the following + +1. Direct grouping needs to be implemented, currently hacking using shuffle grouping (see grouping.cpp) +2. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.emitDirect` not supported and hacked to just emit +3. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.admitBoltTuple` changed to return task ids +4. `BoltDeclarerImpl.grouping(GlobalStreamId id, Grouping grouping)` doesn't support `CUSTOM_SERIALIZED` properly +5. GeneralTopologyContext does a bunch of janky stuff with NoValueMap, for callers who need keySets only +6. Zookeeper acls are not implemented in Utils. + +TODO: +- Figure out why bolt b-1 is failing. This is likely because tuples are being mis-routed due to the + grouping hacks +- Understand why direct grouping and `emitDirect` are needed and how to support, remove hacks (see #1, #2 above) +- Fix `admitBoltTuple` changed to return task ids to return real tuples ids (see #3 above) +- Understand why `CUSTOM_SERIALIZED` is needed and how to support (see #4 above) +- Figure out why `org.apache.storm.trident.topology.TransactionAttempt` is only registered as + `Config.TOPOLOGY_KRYO_REGISTER` in spouts and not bolts. +- Lots of additions were added to org.apache.storm code in heron. These implementations should all + be verified and in some cases fixed. Storm code in heron also seems to drift between versions. This + code should really be pinned to a given storm version. To make it easier to upgrade to new storm + version. Because storm classes are copied and modified, managing storm versions with heron + modifications is really hard currently. +- To get things working the org.apache.storm core jar was added as a dep to the project (see WORKSPACE), + since it has all the trident code. We wouldn't want to do this in the long haul. Instead we'd + probably want to include an artifact that only has the trident code. +- Do a FULL AUDIT of all changes in the branch before even thinking about merging any of it. This is + prototype hackweek code, people. diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java b/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java new file mode 100644 index 00000000000..88b36e6cc59 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/MasterBatchCoordinator.java @@ -0,0 +1,289 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License +package org.apache.storm.trident.topology; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.topology.state.TransactionalState; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.WindowedTimeThrottler; + +import com.twitter.heron.common.basics.TypeUtils; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; + +@SuppressWarnings({"unchecked", "rawtypes", "cast", "serial", "checkstyle:all"}) +public class MasterBatchCoordinator extends BaseRichSpout { +// public static final Logger LOG = LoggerFactory.getLogger(MasterBatchCoordinator.class); + + public static final long INIT_TXID = 1L; + + + public static final String BATCH_STREAM_ID = "$batch"; + public static final String COMMIT_STREAM_ID = "$commit"; + public static final String SUCCESS_STREAM_ID = "$success"; + + private static final String CURRENT_TX = "currtx"; + private static final String CURRENT_ATTEMPTS = "currattempts"; + + private List _states = new ArrayList(); + + TreeMap _activeTx = new TreeMap(); + TreeMap _attemptIds; + + private SpoutOutputCollector _collector; + Long _currTransaction; + int _maxTransactionActive; + + List _coordinators = new ArrayList(); + + + List _managedSpoutIds; + List _spouts; + WindowedTimeThrottler _throttler; + + boolean _active = true; + + public MasterBatchCoordinator(List spoutIds, List spouts) { + if(spoutIds.isEmpty()) { + throw new IllegalArgumentException("Must manage at least one spout"); + } + _managedSpoutIds = spoutIds; + _spouts = spouts; + } + + public List getManagedSpoutIds(){ + return _managedSpoutIds; + } + + @Override + public void activate() { + _active = true; + } + + @Override + public void deactivate() { + _active = false; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1); + for(String spoutId: _managedSpoutIds) { + _states.add(TransactionalState.newCoordinatorState(conf, spoutId)); + } + _currTransaction = getStoredCurrTransaction(); + + _collector = collector; + // only heron-specifc change in this class to use TypeUtils + Number active = TypeUtils.getInteger(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)); + if(active==null) { + _maxTransactionActive = 1; + } else { + _maxTransactionActive = active.intValue(); + } + _attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive); + + + for(int i=0; i<_spouts.size(); i++) { + String txId = _managedSpoutIds.get(i); + _coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context)); + } + } + + @Override + public void close() { + for(TransactionalState state: _states) { + state.close(); + } + } + + @Override + public void nextTuple() { + sync(); + } + + @Override + public void ack(Object msgId) { + TransactionAttempt tx = (TransactionAttempt) msgId; + TransactionStatus status = _activeTx.get(tx.getTransactionId()); + if(status!=null && tx.equals(status.attempt)) { + if(status.status== AttemptStatus.PROCESSING) { + status.status = AttemptStatus.PROCESSED; + } else if(status.status== AttemptStatus.COMMITTING) { + _activeTx.remove(tx.getTransactionId()); + _attemptIds.remove(tx.getTransactionId()); + _collector.emit(SUCCESS_STREAM_ID, new Values(tx)); + _currTransaction = nextTransactionId(tx.getTransactionId()); + for(TransactionalState state: _states) { + state.setData(CURRENT_TX, _currTransaction); + } + } + sync(); + } + } + + @Override + public void fail(Object msgId) { + TransactionAttempt tx = (TransactionAttempt) msgId; + TransactionStatus stored = _activeTx.remove(tx.getTransactionId()); + if(stored!=null && tx.equals(stored.attempt)) { + _activeTx.tailMap(tx.getTransactionId()).clear(); + sync(); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // in partitioned example, in case an emitter task receives a later transaction than it's emitted so far, + // when it sees the earlier txid it should know to emit nothing + declarer.declareStream(BATCH_STREAM_ID, new Fields("tx")); + declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx")); + declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx")); + } + + private void sync() { + // note that sometimes the tuples active may be less than max_spout_pending, e.g. + // max_spout_pending = 3 + // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet), + // and there won't be a batch for tx 4 because there's max_spout_pending tx active + TransactionStatus maybeCommit = _activeTx.get(_currTransaction); + if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) { + maybeCommit.status = AttemptStatus.COMMITTING; + _collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt); + } + + if(_active) { + if(_activeTx.size() < _maxTransactionActive) { + Long curr = _currTransaction; + for(int i=0; i<_maxTransactionActive; i++) { + if(!_activeTx.containsKey(curr) && isReady(curr)) { + // by using a monotonically increasing attempt id, downstream tasks + // can be memory efficient by clearing out state for old attempts + // as soon as they see a higher attempt id for a transaction + Integer attemptId = _attemptIds.get(curr); + if(attemptId==null) { + attemptId = 0; + } else { + attemptId++; + } + _attemptIds.put(curr, attemptId); + for(TransactionalState state: _states) { + state.setData(CURRENT_ATTEMPTS, _attemptIds); + } + + TransactionAttempt attempt = new TransactionAttempt(curr, attemptId); + _activeTx.put(curr, new TransactionStatus(attempt)); + _collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt); + _throttler.markEvent(); + } + curr = nextTransactionId(curr); + } + } + } + } + + private boolean isReady(long txid) { + if(_throttler.isThrottled()) return false; + //TODO: make this strategy configurable?... right now it goes if anyone is ready + for(ITridentSpout.BatchCoordinator coord: _coordinators) { + if(coord.isReady(txid)) return true; + } + return false; + } + + @Override + public Map getComponentConfiguration() { + Config ret = new Config(); + ret.setMaxTaskParallelism(1); + ret.registerSerialization(TransactionAttempt.class); + return ret; + } + + private static enum AttemptStatus { + PROCESSING, + PROCESSED, + COMMITTING + } + + private static class TransactionStatus { + TransactionAttempt attempt; + AttemptStatus status; + + public TransactionStatus(TransactionAttempt attempt) { + this.attempt = attempt; + this.status = AttemptStatus.PROCESSING; + } + + @Override + public String toString() { + return attempt.toString() + " <" + status.toString() + ">"; + } + } + + + private Long nextTransactionId(Long id) { + return id + 1; + } + + private Long getStoredCurrTransaction() { + Long ret = INIT_TXID; + for(TransactionalState state: _states) { + Long curr = (Long) state.getData(CURRENT_TX); + if(curr!=null && curr.compareTo(ret) > 0) { + ret = curr; + } + } + return ret; + } + + private TreeMap getStoredCurrAttempts(long currTransaction, int maxBatches) { + TreeMap ret = new TreeMap(); + for(TransactionalState state: _states) { + Map attempts = (Map) state.getData(CURRENT_ATTEMPTS); + if(attempts==null) attempts = new HashMap(); + for(Entry e: attempts.entrySet()) { + // this is because json doesn't allow numbers as keys... + // TODO: replace json with a better form of encoding + Number txidObj; + if(e.getKey() instanceof String) { + txidObj = Long.parseLong((String) e.getKey()); + } else { + txidObj = (Number) e.getKey(); + } + long txid = ((Number) txidObj).longValue(); + int attemptId = ((Number) e.getValue()).intValue(); + Integer curr = ret.get(txid); + if(curr==null || attemptId > curr) { + ret.put(txid, attemptId); + } + } + } + ret.headMap(currTransaction).clear(); + ret.tailMap(currTransaction + maxBatches - 1).clear(); + return ret; + } +} diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java b/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java new file mode 100644 index 00000000000..f30d69aaad1 --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/TridentBoltExecutor.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology; + +import org.apache.storm.Config; +import org.apache.storm.coordination.BatchOutputCollector; +import org.apache.storm.coordination.BatchOutputCollectorImpl; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.FailedException; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.ReportedFailedException; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.TupleUtils; +import org.apache.storm.utils.Utils; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.storm.trident.spout.IBatchID; + +@SuppressWarnings({"unchecked", "rawtypes", "serial", "cast"}) +public class TridentBoltExecutor implements IRichBolt { + public static final String COORD_STREAM_PREFIX = "$coord-"; + + public static String COORD_STREAM(String batch) { + return COORD_STREAM_PREFIX + batch; + } + + public static class CoordType implements Serializable { + public boolean singleCount; + + protected CoordType(boolean singleCount) { + this.singleCount = singleCount; + } + + public static CoordType single() { + return new CoordType(true); + } + + public static CoordType all() { + return new CoordType(false); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof CoordType)) return false; + + CoordType coordType = (CoordType) o; + + return singleCount == coordType.singleCount; + } + + @Override + public int hashCode() { + return (singleCount ? 1 : 0); + } + + @Override + public String toString() { + return ""; + } + } + + public static class CoordSpec implements Serializable { + public GlobalStreamId commitStream = null; + public Map coords = new HashMap<>(); + + public CoordSpec() { + } + } + + public static class CoordCondition implements Serializable { + public GlobalStreamId commitStream; + public int expectedTaskReports; + Set targetTasks; + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + Map _batchGroupIds; + Map _coordSpecs; + Map _coordConditions; + ITridentBatchBolt _bolt; + long _messageTimeoutMs; + long _lastRotate; + + RotatingMap _batches; + + // map from batchgroupid to coordspec + public TridentBoltExecutor(ITridentBatchBolt bolt, Map batchGroupIds, Map coordinationSpecs) { + _batchGroupIds = batchGroupIds; + _coordSpecs = coordinationSpecs; + _bolt = bolt; + } + + public static class TrackedBatch { + int attemptId; + BatchInfo info; + CoordCondition condition; + int reportedTasks = 0; + int expectedTupleCount = 0; + int receivedTuples = 0; + Map taskEmittedTuples = new HashMap<>(); + boolean failed = false; + boolean receivedCommit; + Tuple delayedAck = null; + + public TrackedBatch(BatchInfo info, CoordCondition condition, int attemptId) { + this.info = info; + this.condition = condition; + this.attemptId = attemptId; + receivedCommit = condition.commitStream == null; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + private static class CoordinatedOutputCollector implements IOutputCollector { + IOutputCollector _delegate; + + TrackedBatch _currBatch = null; + + public void setCurrBatch(TrackedBatch batch) { + _currBatch = batch; + } + + public CoordinatedOutputCollector(IOutputCollector delegate) { + _delegate = delegate; + } + + public List emit(String stream, Collection anchors, List tuple) { + List tasks = _delegate.emit(stream, anchors, tuple); + updateTaskCounts(tasks); + return tasks; + } + + public void emitDirect(int task, String stream, Collection anchors, List tuple) { + updateTaskCounts(Arrays.asList(task)); + _delegate.emitDirect(task, stream, anchors, tuple); + } + + public void ack(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void fail(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void resetTimeout(Tuple tuple) { + throw new IllegalStateException("Method should never be called"); + } + + public void reportError(Throwable error) { + _delegate.reportError(error); + } + + + private void updateTaskCounts(List tasks) { + if(_currBatch!=null) { + Map taskEmittedTuples = _currBatch.taskEmittedTuples; + for(Integer task: tasks) { + int newCount = Utils.get(taskEmittedTuples, task, 0) + 1; + taskEmittedTuples.put(task, newCount); + } + } + } + } + + OutputCollector _collector; + CoordinatedOutputCollector _coordCollector; + BatchOutputCollector _coordOutputCollector; + TopologyContext _context; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { + _messageTimeoutMs = 30000; // TODO support this context.maxTopologyMessageTimeout() * 1000L; + _lastRotate = System.currentTimeMillis(); + _batches = new RotatingMap<>(2); + _context = context; + _collector = collector; + _coordCollector = new CoordinatedOutputCollector(collector); + _coordOutputCollector = new BatchOutputCollectorImpl(new OutputCollector(_coordCollector)); + + _coordConditions = (Map) context.getExecutorData("__coordConditions"); + if(_coordConditions==null) { + _coordConditions = new HashMap<>(); + for(String batchGroup: _coordSpecs.keySet()) { + CoordSpec spec = _coordSpecs.get(batchGroup); + CoordCondition cond = new CoordCondition(); + cond.commitStream = spec.commitStream; + cond.expectedTaskReports = 0; + for(String comp: spec.coords.keySet()) { + CoordType ct = spec.coords.get(comp); + if(ct.equals(CoordType.single())) { + cond.expectedTaskReports+=1; + } else { + cond.expectedTaskReports+=context.getComponentTasks(comp).size(); + } + } + cond.targetTasks = new HashSet<>(); + // get all downstream consumers of this component of the the stream COORD_STREAM(batchGroup) + for(String component: Utils.get(context.getThisTargets(), + COORD_STREAM(batchGroup), + new HashMap()).keySet()) { + cond.targetTasks.addAll(context.getComponentTasks(component)); + } + _coordConditions.put(batchGroup, cond); + } + context.setExecutorData("_coordConditions", _coordConditions); + } + _bolt.prepare(conf, context, _coordOutputCollector); + } + + private void failBatch(TrackedBatch tracked, FailedException e) { + if(e!=null && e instanceof ReportedFailedException) { + _collector.reportError(e); + } + tracked.failed = true; + if(tracked.delayedAck!=null) { + _collector.fail(tracked.delayedAck); + tracked.delayedAck = null; + } + } + + private void failBatch(TrackedBatch tracked) { + failBatch(tracked, null); + } + + private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) { + boolean success = true; + try { + _bolt.finishBatch(tracked.info); + String stream = COORD_STREAM(tracked.info.batchGroup); + for(Integer task: tracked.condition.targetTasks) { + _collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0))); + } + if(tracked.delayedAck!=null) { + _collector.ack(tracked.delayedAck); + tracked.delayedAck = null; + } + } catch(FailedException e) { + failBatch(tracked, e); + success = false; + } + _batches.remove(tracked.info.batchId.getId()); + return success; + } + + private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { + if(tracked.failed) { + failBatch(tracked); + _collector.fail(tuple); + return; + } + CoordCondition cond = tracked.condition; + boolean delayed = tracked.delayedAck==null && + (cond.commitStream!=null && type==TupleType.COMMIT + || cond.commitStream==null); + if(delayed) { + tracked.delayedAck = tuple; + } + boolean failed = false; + if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) { + if(tracked.receivedTuples == tracked.expectedTupleCount) { + finishBatch(tracked, tuple); + } else { + //TODO: add logging that not all tuples were received + failBatch(tracked); + _collector.fail(tuple); + failed = true; + } + } + + if(!delayed && !failed) { + _collector.ack(tuple); + } + + } + + @Override + public void execute(Tuple tuple) { + if(TupleUtils.isTick(tuple)) { + long now = System.currentTimeMillis(); + if(now - _lastRotate > _messageTimeoutMs) { + _batches.rotate(); + _lastRotate = now; + } + return; + } + + String batchGroup = _batchGroupIds.get(new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId())); + if(batchGroup==null) { + // this is so we can do things like have simple DRPC that doesn't need to use batch processing + _coordCollector.setCurrBatch(null); + _bolt.execute(null, tuple); + _collector.ack(tuple); + return; + } + IBatchID id = (IBatchID) tuple.getValue(0); + //get transaction id + //if it already exists and attempt id is greater than the attempt there + + + TrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); +// if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { +// System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() +// + " (" + _batches.size() + ")" + +// "\ntuple: " + tuple + +// "\nwith tracked " + tracked + +// "\nwith id " + id + +// "\nwith group " + batchGroup +// + "\n"); +// +// } + //System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex()); + + // this code here ensures that only one attempt is ever tracked for a batch, so when + // failures happen you don't get an explosion in memory usage in the tasks + if(tracked!=null) { + if(id.getAttemptId() > tracked.attemptId) { + _batches.remove(id.getId()); + tracked = null; + } else if(id.getAttemptId() < tracked.attemptId) { + // no reason to try to execute a previous attempt than we've already seen + return; + } + } + + if(tracked==null) { + tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId()); + _batches.put(id.getId(), tracked); + } + _coordCollector.setCurrBatch(tracked); + + //System.out.println("TRACKED: " + tracked + " " + tuple); + + TupleType t = getTupleType(tuple, tracked); + if(t==TupleType.COMMIT) { + tracked.receivedCommit = true; + checkFinish(tracked, tuple, t); + } else if(t==TupleType.COORD) { + int count = tuple.getInteger(1); + tracked.reportedTasks++; + tracked.expectedTupleCount+=count; + checkFinish(tracked, tuple, t); + } else { + tracked.receivedTuples++; + boolean success = true; + try { + _bolt.execute(tracked.info, tuple); + if(tracked.condition.expectedTaskReports==0) { + success = finishBatch(tracked, tuple); + } + } catch(FailedException e) { + failBatch(tracked, e); + } + if(success) { + _collector.ack(tuple); + } else { + _collector.fail(tuple); + } + } + _coordCollector.setCurrBatch(null); + } + + @Override + public void cleanup() { + _bolt.cleanup(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + _bolt.declareOutputFields(declarer); + for(String batchGroup: _coordSpecs.keySet()) { + declarer.declareStream(COORD_STREAM(batchGroup), true, new Fields("id", "count")); + } + } + + @Override + public Map getComponentConfiguration() { + Map ret = _bolt.getComponentConfiguration(); + if(ret==null) ret = new HashMap<>(); + ret.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5); + // TODO: Need to be able to set the tick tuple time to the message timeout, ideally without parameterization + return ret; + } + + private TupleType getTupleType(Tuple tuple, TrackedBatch batch) { + CoordCondition cond = batch.condition; + if(cond.commitStream!=null + && new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()).equals(cond.commitStream)) { + return TupleType.COMMIT; + } else if(cond.expectedTaskReports > 0 + && tuple.getSourceStreamId().startsWith(COORD_STREAM_PREFIX)) { + return TupleType.COORD; + } else { + return TupleType.REGULAR; + } + } + + static enum TupleType { + REGULAR, + COMMIT, + COORD + } +} \ No newline at end of file diff --git a/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java b/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java new file mode 100644 index 00000000000..5d3213bad1f --- /dev/null +++ b/heron/storm/src/java/org/apache/storm/trident/topology/TridentTopologyBuilder.java @@ -0,0 +1,756 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.trident.topology; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.storm.generated.GlobalStreamId; +//import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.grouping.PartialKeyGrouping; +import org.apache.storm.topology.BaseConfigurationDeclarer; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.SpoutDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.trident.spout.BatchSpoutExecutor; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.trident.spout.ICommitterTridentSpout; +import org.apache.storm.trident.spout.ITridentSpout; +import org.apache.storm.trident.spout.RichSpoutBatchTriggerer; +import org.apache.storm.trident.spout.TridentSpoutCoordinator; +import org.apache.storm.trident.spout.TridentSpoutExecutor; +import org.apache.storm.trident.topology.TridentBoltExecutor.CoordSpec; +import org.apache.storm.trident.topology.TridentBoltExecutor.CoordType; +import org.apache.storm.tuple.Fields; + +// based on transactional topologies +@SuppressWarnings({"unchecked", "rawtypes", "checkstyle:all"}) +public class TridentTopologyBuilder { + Map _batchIds = new HashMap(); + Map _spouts = new HashMap(); + Map _batchPerTupleSpouts = new HashMap(); + Map _bolts = new HashMap(); + + + @SuppressWarnings({"unchecked", "rawtypes"}) + public SpoutDeclarer setBatchPerTupleSpout(String id, String streamName, IRichSpout spout, Integer parallelism, String batchGroup) { + Map batchGroups = new HashMap(); + batchGroups.put(streamName, batchGroup); + markBatchGroups(id, batchGroups); + SpoutComponent c = new SpoutComponent(spout, streamName, parallelism, batchGroup); + _batchPerTupleSpouts.put(id, c); + return new SpoutDeclarerImpl(c); + } + + public SpoutDeclarer setSpout(String id, String streamName, String txStateId, IBatchSpout spout, Integer parallelism, String batchGroup) { + return setSpout(id, streamName, txStateId, new BatchSpoutExecutor(spout), parallelism, batchGroup); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) { + Map batchGroups = new HashMap(); + batchGroups.put(streamName, batchGroup); + markBatchGroups(id, batchGroups); + + TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup); + _spouts.put(id, c); + return new SpoutDeclarerImpl(c); + } + + // map from stream name to batch id + public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set committerBatches, Map batchGroups) { + markBatchGroups(id, batchGroups); + Component c = new Component(bolt, parallelism, committerBatches); + _bolts.put(id, c); + return new BoltDeclarerImpl(c); + + } + + String masterCoordinator(String batchGroup) { + return "$mastercoord-" + batchGroup; + } + + static final String SPOUT_COORD_PREFIX = "$spoutcoord-"; + + public static String spoutCoordinator(String spoutId) { + return SPOUT_COORD_PREFIX + spoutId; + } + + public static String spoutIdFromCoordinatorId(String coordId) { + return coordId.substring(SPOUT_COORD_PREFIX.length()); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map fleshOutStreamBatchIds(boolean includeCommitStream) { + Map ret = new HashMap<>(_batchIds); + Set allBatches = new HashSet(_batchIds.values()); + for(String b: allBatches) { + ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b); + if(includeCommitStream) { + ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b); + } + // DO NOT include the success stream as part of the batch. it should not trigger coordination tuples, + // and is just a metadata tuple to assist in cleanup, should not trigger batch tracking + } + + for(String id: _spouts.keySet()) { + TransactionalSpoutComponent c = _spouts.get(id); + if(c.batchGroupId!=null) { + ret.put(new GlobalStreamId(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID), c.batchGroupId); + } + } + + //this takes care of setting up coord streams for spouts and bolts + for(GlobalStreamId s: _batchIds.keySet()) { + String b = _batchIds.get(s); + ret.put(new GlobalStreamId(s.get_componentId(), TridentBoltExecutor.COORD_STREAM(b)), b); + } + + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public StormTopology buildTopology() { + TopologyBuilder builder = new TopologyBuilder(); + Map batchIdsForSpouts = fleshOutStreamBatchIds(false); + Map batchIdsForBolts = fleshOutStreamBatchIds(true); + + Map> batchesToCommitIds = new HashMap<>(); + Map> batchesToSpouts = new HashMap<>(); + + for(String id: _spouts.keySet()) { + TransactionalSpoutComponent c = _spouts.get(id); + if(c.spout instanceof IRichSpout) { + + //TODO: wrap this to set the stream name + builder.setSpout(id, (IRichSpout) c.spout, c.parallelism); + } else { + String batchGroup = c.batchGroupId; + if(!batchesToCommitIds.containsKey(batchGroup)) { + batchesToCommitIds.put(batchGroup, new ArrayList()); + } + batchesToCommitIds.get(batchGroup).add(c.commitStateId); + + if(!batchesToSpouts.containsKey(batchGroup)) { + batchesToSpouts.put(batchGroup, new ArrayList()); + } + batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout); + + + BoltDeclarer scd = + builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout)) + .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID) + .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID); + + for(Map m: c.componentConfs) { + scd.addConfigurations(m); + } + + Map specs = new HashMap(); + specs.put(c.batchGroupId, new CoordSpec()); + BoltDeclarer bd = builder.setBolt(id, + new TridentBoltExecutor( + new TridentSpoutExecutor( + c.commitStateId, + c.streamName, + ((ITridentSpout) c.spout)), + batchIdsForSpouts, + specs), + c.parallelism); + bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID); + bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID); + if(c.spout instanceof ICommitterTridentSpout) { + bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID); + } + for(Map m: c.componentConfs) { + bd.addConfigurations(m); + } + } + } + + for(String id: _batchPerTupleSpouts.keySet()) { + SpoutComponent c = _batchPerTupleSpouts.get(id); + SpoutDeclarer d = builder.setSpout(id, new RichSpoutBatchTriggerer((IRichSpout) c.spout, c.streamName, c.batchGroupId), c.parallelism); + + for(Map conf: c.componentConfs) { + d.addConfigurations(conf); + } + } + + for(Map.Entry> entry: batchesToCommitIds.entrySet()) { + String batch = entry.getKey(); + List commitIds = entry.getValue(); + builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch))); + } + + for(String id: _bolts.keySet()) { + Component c = _bolts.get(id); + + Map specs = new HashMap<>(); + + for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { + String batch = batchIdsForBolts.get(s); + if (batch == null) { + throw new RuntimeException(String.format( + "Batch group id not found for stream id %s in batchIdsForBolts: %s", + s, batchIdsForBolts)); + } + if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec()); + CoordSpec spec = specs.get(batch); + CoordType ct; + if(_batchPerTupleSpouts.containsKey(s.get_componentId())) { + ct = CoordType.single(); + } else { + ct = CoordType.all(); + } + spec.coords.put(s.get_componentId(), ct); + } + + for(String b: c.committerBatches) { + String masterCoordinator = masterCoordinator(b); + CoordSpec spec = specs.get(b); + GlobalStreamId streamId = new GlobalStreamId(masterCoordinator, MasterBatchCoordinator.COMMIT_STREAM_ID); + spec.commitStream = streamId; + } + + BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism); + for(Map conf: c.componentConfs) { + d.addConfigurations(conf); + } + + for(InputDeclaration inputDecl: c.declarations) { + inputDecl.declare(d); + } + + Map> batchToComponents = getBoltBatchToComponentSubscriptions(id); + for(Map.Entry> entry: batchToComponents.entrySet()) { + for(String comp: entry.getValue()) { + d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(entry.getKey())); + } + } + + for(String b: c.committerBatches) { + d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID); + } + } + + return builder.createTopology(); + } + + private void markBatchGroups(String component, Map batchGroups) { + for(Map.Entry entry: batchGroups.entrySet()) { + _batchIds.put(new GlobalStreamId(component, entry.getKey()), entry.getValue()); + } + } + + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class SpoutComponent { + public Object spout; + public Integer parallelism; + public List> componentConfs = new ArrayList<>(); + String batchGroupId; + String streamName; + + public SpoutComponent(Object spout, String streamName, Integer parallelism, String batchGroupId) { + this.spout = spout; + this.streamName = streamName; + this.parallelism = parallelism; + this.batchGroupId = batchGroupId; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } + } + + private static class TransactionalSpoutComponent extends SpoutComponent { + public String commitStateId; + + public TransactionalSpoutComponent(Object spout, String streamName, Integer parallelism, String commitStateId, String batchGroupId) { + super(spout, streamName, parallelism, batchGroupId); + this.commitStateId = commitStateId; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class Component { + public ITridentBatchBolt bolt; + public Integer parallelism; + public List declarations = new ArrayList<>(); + public List> componentConfs = new ArrayList<>(); + public Set committerBatches; + + public Component(ITridentBatchBolt bolt, Integer parallelism,Set committerBatches) { + this.bolt = bolt; + this.parallelism = parallelism; + this.committerBatches = committerBatches; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map> getBoltBatchToComponentSubscriptions(String id) { + Map> ret = new HashMap(); + for(GlobalStreamId s: getBoltSubscriptionStreams(id)) { + String b = _batchIds.get(s); + if(!ret.containsKey(b)) ret.put(b, new HashSet()); + ret.get(b).add(s.get_componentId()); + } + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + List getBoltSubscriptionStreams(String id) { + List ret = new ArrayList(); + Component c = _bolts.get(id); + for(InputDeclaration d: c.declarations) { + ret.add(new GlobalStreamId(d.getComponent(), d.getStream())); + } + return ret; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static interface InputDeclaration { + void declare(InputDeclarer declarer); + String getComponent(); + String getStream(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class SpoutDeclarerImpl extends BaseConfigurationDeclarer implements SpoutDeclarer { + SpoutComponent _component; + + public SpoutDeclarerImpl(SpoutComponent component) { + _component = component; + } + + @Override + public SpoutDeclarer addConfigurations(Map conf) { + _component.componentConfs.add(conf); + return this; + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static class BoltDeclarerImpl extends BaseConfigurationDeclarer implements BoltDeclarer { + Component _component; + + public BoltDeclarerImpl(Component component) { + _component = component; + } + + @Override + public BoltDeclarer fieldsGrouping(final String component, final Fields fields) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.fieldsGrouping(component, fields); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.fieldsGrouping(component, streamId, fields); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer globalGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.globalGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer globalGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.globalGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer shuffleGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.shuffleGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer shuffleGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.shuffleGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer localOrShuffleGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.localOrShuffleGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.localOrShuffleGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer noneGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.noneGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer noneGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.noneGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer allGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.allGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer allGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.allGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer directGrouping(final String component) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.directGrouping(component); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer directGrouping(final String component, final String streamId) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.directGrouping(component, streamId); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) { + return customGrouping(componentId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) { + return customGrouping(componentId, streamId, new PartialKeyGrouping(fields)); + } + + @Override + public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.customGrouping(component, grouping); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return null; + } + }); + return this; + } + + @Override + public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.customGrouping(component, streamId, grouping); + } + + @Override + public String getComponent() { + return component; + } + + @Override + public String getStream() { + return streamId; + } + }); + return this; + } + + @Override + public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) { + addDeclaration(new InputDeclaration() { + @Override + public void declare(InputDeclarer declarer) { + declarer.grouping(stream, grouping); + } + + @Override + public String getComponent() { + return stream.get_componentId(); + } + + @Override + public String getStream() { + return stream.get_streamId(); + } + }); + return this; + } + + private void addDeclaration(InputDeclaration declaration) { + _component.declarations.add(declaration); + } + + @Override + public BoltDeclarer addConfigurations(Map conf) { + _component.componentConfs.add(conf); + return this; + } + } +} diff --git a/heron/storm/src/java/org/apache/storm/utils/Utils.java b/heron/storm/src/java/org/apache/storm/utils/Utils.java index d67af5ed552..1183bfbcc36 100644 --- a/heron/storm/src/java/org/apache/storm/utils/Utils.java +++ b/heron/storm/src/java/org/apache/storm/utils/Utils.java @@ -18,14 +18,45 @@ package org.apache.storm.utils; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.ObjectInputStream; +import java.net.URL; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.commons.io.input.ClassLoaderObjectInputStream; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.DefaultExhibitorRestClient; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.ExhibitorEnsembleProvider; +import org.apache.storm.shade.org.apache.curator.ensemble.exhibitor.Exhibitors; +import org.apache.storm.shade.org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; +import org.apache.storm.Config; +import org.apache.storm.shade.org.apache.zookeeper.data.ACL; +import org.apache.storm.thrift.TBase; +import org.apache.storm.thrift.TDeserializer; +import org.apache.storm.thrift.TException; +import org.apache.storm.thrift.TSerializer; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; + import com.twitter.heron.common.basics.TypeUtils; // import org.json.simple.JSONValue; +@SuppressWarnings({"unchecked", "rawtypes"}) public final class Utils { public static final String DEFAULT_STREAM_ID = com.twitter.heron.api.utils.Utils.DEFAULT_STREAM_ID; @@ -92,10 +123,188 @@ public static byte[] serialize(Object obj) { return com.twitter.heron.api.utils.Utils.serialize(obj); } + public static byte[] javaSerialize(Object obj) { + return serialize(obj); + } + public static Object deserialize(byte[] serialized) { return com.twitter.heron.api.utils.Utils.deserialize(serialized); } + private static ThreadLocal threadSer = new ThreadLocal(); + + public static byte[] thriftSerialize(TBase t) { + try { + TSerializer ser = threadSer.get(); + if (ser == null) { + ser = new TSerializer(); + threadSer.set(ser); + } + return ser.serialize(t); + } catch (TException e) { + throw new RuntimeException(e); + } + } + + public static T thriftDeserialize(Class c, byte[] b) { + try { + return Utils.thriftDeserialize(c, b, 0, b.length); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static T thriftDeserialize(Class c, byte[] b, int offset, int length) { + try { + T ret = (T) c.newInstance(); + TDeserializer des = getDes(); + des.deserialize((TBase) ret, b, offset, length); + return ret; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static ThreadLocal threadDes = new ThreadLocal(); + + private static TDeserializer getDes() { + TDeserializer des = threadDes.get(); + if (des == null) { + des = new TDeserializer(); + threadDes.set(des); + } + return des; + } + + private static ClassLoader cl = null; + + public static T javaDeserialize(byte[] serialized, Class clazz) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(serialized); + ObjectInputStream ois = null; + if (null == cl) { + ois = new ObjectInputStream(bis); + } else { + // Use custom class loader set in testing environment + ois = new ClassLoaderObjectInputStream(cl, bis); + } + Object ret = ois.readObject(); + ois.close(); + return (T) ret; + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public static String join(Iterable coll, String sep) { + Iterator it = coll.iterator(); + StringBuilder ret = new StringBuilder(); + while (it.hasNext()) { + ret.append(it.next()); + if (it.hasNext()) { + ret.append(sep); + } + } + return ret.toString(); + } + + public static CuratorFramework newCuratorStarted(Map conf, List servers, + Object port, String root, + ZookeeperAuthInfo auth) { + CuratorFramework ret = newCurator(conf, servers, port, root, auth); + ret.start(); + return ret; + } + + public static CuratorFramework newCuratorStarted(Map conf, List servers, Object port, ZookeeperAuthInfo auth) { + CuratorFramework ret = newCurator(conf, servers, port, auth); + ret.start(); + return ret; + } + + public static CuratorFramework newCurator(Map conf, List servers, Object port, ZookeeperAuthInfo auth) { + return newCurator(conf, servers, port, "", auth); + } + + public static CuratorFramework newCurator(Map conf, List servers, Object port, String root, ZookeeperAuthInfo auth) { + List serverPorts = new ArrayList(); + for (String zkServer : servers) { + serverPorts.add(zkServer + ":" + Utils.getInt(port)); + } + String zkStr = StringUtils.join(serverPorts, ",") + root; + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + + setupBuilder(builder, zkStr, conf, auth); + + return builder.build(); + } + + protected static void setupBuilder(CuratorFrameworkFactory.Builder builder, final String zkStr, Map conf, ZookeeperAuthInfo auth) { + List exhibitorServers = getStrings(conf.get(Config.STORM_EXHIBITOR_SERVERS)); + if (!exhibitorServers.isEmpty()) { + // use exhibitor servers + builder.ensembleProvider(new ExhibitorEnsembleProvider( + new Exhibitors(exhibitorServers, Utils.getInt(conf.get(Config.STORM_EXHIBITOR_PORT)), + new Exhibitors.BackupConnectionStringProvider() { + @Override + public String getBackupConnectionString() throws Exception { + // use zk servers as backup if they exist + return zkStr; + } + }), + new DefaultExhibitorRestClient(), + Utils.getString(conf.get(Config.STORM_EXHIBITOR_URIPATH)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_POLL)), + new StormBoundedExponentialBackoffRetry( + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_EXHIBITOR_RETRY_TIMES))))); + } else { + builder.connectString(zkStr); + } + builder + .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) + .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) + .retryPolicy(new StormBoundedExponentialBackoffRetry( + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); + + if (auth != null && auth.scheme != null && auth.payload != null) { + builder.authorization(auth.scheme, auth.payload); + } + } + + public static List getStrings(final Object o) { + if (o == null) { + return new ArrayList(); + } else if (o instanceof String) { + return new ArrayList() { + private static final long serialVersionUID = -2182685021645675803L; + { add((String) o); }}; + } else if (o instanceof Collection) { + List answer = new ArrayList(); + for (Object v : (Collection) o) { + answer.add(v.toString()); + } + return answer; + } else { + throw new IllegalArgumentException("Don't know how to convert to string list"); + } + } + + public static String getString(Object o) { + if (null == o) { + throw new IllegalArgumentException("Don't know how to convert null to String"); + } + return o.toString(); + } + public static List getWorkerACL(Map conf) { + return null; //TODO: implement ACL support + } + public static Integer getInt(Object o) { return TypeUtils.getInteger(o); } @@ -115,4 +324,97 @@ public static byte[] toByteArray(ByteBuffer buffer) { public static T get(Map m, S key, T defaultValue) { return com.twitter.heron.api.utils.Utils.get(m, key, defaultValue); } + + public static Map readStormConfig() { + Map ret = readDefaultConfig(); + String confFile = System.getProperty("storm.conf.file"); + Map storm; + if (confFile == null || confFile.equals("")) { + storm = findAndReadConfigFile("storm.yaml", false); + } else { + storm = findAndReadConfigFile(confFile, true); + } + ret.putAll(storm); + ret.putAll(readCommandLineOpts()); + return ret; + } + + public static Map readDefaultConfig() { + return findAndReadConfigFile("defaults.yaml", true); + } + + private static Map findAndReadConfigFile(String name, boolean mustExist) { + InputStream in = null; + boolean confFileEmpty = false; + try { + in = getConfigFileInputStream(name); + if (null != in) { + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = (Map) yaml.load(new InputStreamReader(in)); + if (null != ret) { + return new HashMap(ret); + } else { + confFileEmpty = true; + } + } + + if (mustExist) { + if(confFileEmpty) + throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); + else + throw new RuntimeException("Could not find config file on classpath " + name); + } else { + return new HashMap(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + private static InputStream getConfigFileInputStream(String configFilePath) + throws IOException { + if (null == configFilePath) { + throw new IOException( + "Could not find config file, name not specified"); + } + + HashSet resources = new HashSet<>(findResources(configFilePath)); + if (resources.isEmpty()) { + File configFile = new File(configFilePath); + if (configFile.exists()) { + return new FileInputStream(configFile); + } + } else if (resources.size() > 1) { + throw new IOException( + "Found multiple " + configFilePath + + " resources. You're probably bundling the Storm jars with your topology jar. " + + resources); + } else { +// LOG.debug("Using "+configFilePath+" from resources"); + URL resource = resources.iterator().next(); + return resource.openStream(); + } + return null; + } + + private static List findResources(String name) { + try { + Enumeration resources = Thread.currentThread().getContextClassLoader().getResources(name); + List ret = new ArrayList<>(); + while (resources.hasMoreElements()) { + ret.add(resources.nextElement()); + } + return ret; + } catch(IOException e) { + throw new RuntimeException(e); + } + } } From 5306bc3c6bd504f6cf95614bce4ff5bf60682fa1 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Mon, 10 Apr 2017 11:00:01 -0700 Subject: [PATCH 2/6] clean up readme --- .../storm/src/java/org/apache/storm/trident/README.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/heron/storm/src/java/org/apache/storm/trident/README.md b/heron/storm/src/java/org/apache/storm/trident/README.md index 6b3e784343a..647863b1b1c 100644 --- a/heron/storm/src/java/org/apache/storm/trident/README.md +++ b/heron/storm/src/java/org/apache/storm/trident/README.md @@ -5,12 +5,14 @@ need to either support direct grouping in Heron, or refactor Trident to not requ possible). To run: -`~/bin/heron kill local TridentWordCountTopology && rm -rf ~/.herondata/* -bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client-install.sh --user && \ + +``` +$ ~/bin/heron kill local TridentWordCountTopology && rm -rf ~/.herondata/* +$ bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client-install.sh --user && \ ~/bin/heron submit local ~/.heron/examples/heron-examples.jar \ com.twitter.heron.examples.TridentWordCountTopology TridentWordCountTopology -less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/container_1_b-1_4.log.0 -` +$ less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/container_1_b-1_4.log.0 +``` Current status: - Topology compiles and can be submitted @@ -22,6 +24,7 @@ Current status: - Failures appear in the counters for bolt b-1, but the logs don't show anything and - Correctness is not right due to the following +Issues: 1. Direct grouping needs to be implemented, currently hacking using shuffle grouping (see grouping.cpp) 2. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.emitDirect` not supported and hacked to just emit 3. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.admitBoltTuple` changed to return task ids From 3c8e98b6031e325af4c377a35d13f95a9255e979 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Thu, 11 May 2017 15:13:23 -0700 Subject: [PATCH 3/6] Adding support for direct grouping --- .../heron/api/topology/BoltDeclarer.java | 8 +- .../instance/AbstractOutputCollector.java | 20 +++-- .../bolt/BoltOutputCollectorImpl.java | 11 +-- .../spout/SpoutOutputCollectorImpl.java | 9 ++- heron/instance/tests/java/BUILD | 2 + .../heron/grouping/EmitDirectBoltTest.java | 48 ++++++++++++ .../grouping/EmitDirectRoundRobinBolt.java | 70 ++++++++++++++++++ .../grouping/EmitDirectRoundRobinSpout.java | 61 +++++++++++++++ .../heron/grouping/EmitDirectSpoutTest.java | 46 ++++++++++++ .../com/twitter/heron/resource/TestSpout.java | 9 ++- heron/simulator/src/java/StateExample.java | 74 +++++++++++++++++++ heron/stmgr/src/cpp/BUILD | 2 + .../src/cpp/grouping/direct-grouping.cpp | 44 +++++++++++ .../stmgr/src/cpp/grouping/direct-grouping.h | 41 ++++++++++ heron/stmgr/src/cpp/grouping/grouping.cpp | 4 +- 15 files changed, 428 insertions(+), 21 deletions(-) create mode 100644 heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectBoltTest.java create mode 100644 heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinBolt.java create mode 100644 heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinSpout.java create mode 100644 heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectSpoutTest.java create mode 100644 heron/simulator/src/java/StateExample.java create mode 100644 heron/stmgr/src/cpp/grouping/direct-grouping.cpp create mode 100644 heron/stmgr/src/cpp/grouping/direct-grouping.h diff --git a/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java b/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java index 2d786b199d7..820fb44ed3c 100644 --- a/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java +++ b/heron/api/src/java/com/twitter/heron/api/topology/BoltDeclarer.java @@ -151,8 +151,12 @@ public BoltDeclarer directGrouping(String componentName) { } public BoltDeclarer directGrouping(String componentName, String streamId) { - // TODO:- revisit this - throw new RuntimeException("direct Grouping not implemented"); + TopologyAPI.InputStream.Builder bldr = TopologyAPI.InputStream.newBuilder(); + bldr.setStream( + TopologyAPI.StreamId.newBuilder().setId(streamId).setComponentName(componentName)); + bldr.setGtype(TopologyAPI.Grouping.DIRECT); + bldr.setType(TopologyAPI.CustomGroupingObjectType.JAVA_OBJECT); + return grouping(bldr); } public BoltDeclarer customGrouping(String componentName, CustomStreamGrouping grouping) { diff --git a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java index 890ee9f85c1..ec6849450c0 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java +++ b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java @@ -13,6 +13,7 @@ // limitations under the License. package com.twitter.heron.instance; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -93,7 +94,8 @@ public long getTotalTuplesEmitted() { } protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId, - List tuple) { + List tuple, + Integer taskId) { // Start construct the data tuple HeronTuples.HeronDataTuple.Builder builder = HeronTuples.HeronDataTuple.newBuilder(); @@ -101,15 +103,21 @@ protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId, builder.setKey(0); List customGroupingTargetTaskIds = null; - if (!helper.isCustomGroupingEmpty()) { + if (taskId != null) { + // TODO: somehow assert that the input stream of the downstream bolt was configured + // with directGrouping + + customGroupingTargetTaskIds = new ArrayList<>(); + customGroupingTargetTaskIds.add(taskId); + } else if (!helper.isCustomGroupingEmpty()) { // customGroupingTargetTaskIds will be null if this stream is not CustomStreamGrouping customGroupingTargetTaskIds = helper.chooseTasksForCustomStreamGrouping(streamId, tuple); + } - if (customGroupingTargetTaskIds != null) { - // It is a CustomStreamGrouping - builder.addAllDestTaskIds(customGroupingTargetTaskIds); - } + if (customGroupingTargetTaskIds != null) { + // It is a CustomStreamGrouping + builder.addAllDestTaskIds(customGroupingTargetTaskIds); } // Invoke user-defined emit task hook diff --git a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java index ffa58fb3e5f..cb68a5ebdd7 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java +++ b/heron/instance/src/java/com/twitter/heron/instance/bolt/BoltOutputCollectorImpl.java @@ -69,13 +69,13 @@ protected BoltOutputCollectorImpl(IPluggableSerializer serializer, @Override public List emit(String streamId, Collection anchors, List tuple) { - return admitBoltTuple(streamId, anchors, tuple); + return admitBoltTuple(streamId, anchors, tuple, null); } @Override public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { - throw new RuntimeException("emitDirect not supported"); + admitBoltTuple(streamId, anchors, tuple, taskId); } @Override @@ -98,14 +98,15 @@ public void fail(Tuple input) { ///////////////////////////////////////////////////////// private List admitBoltTuple(String streamId, Collection anchors, - List tuple) { + List tuple, + Integer taskId) { if (getPhysicalPlanHelper().isTerminatedComponent()) { - // No need to handle this tuples + // No need to handle this tuple return null; } // Start construct the data tuple - HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple); + HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple, taskId); // Set the anchors for a tuple if (anchors != null) { diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java index 39a975410fb..c1db282c512 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java +++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutOutputCollectorImpl.java @@ -81,12 +81,12 @@ protected SpoutOutputCollectorImpl(IPluggableSerializer serializer, @Override public List emit(String streamId, List tuple, Object messageId) { - return admitSpoutTuple(streamId, tuple, messageId); + return admitSpoutTuple(streamId, tuple, messageId, null); } @Override public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { - throw new RuntimeException("emitDirect Not implemented"); + admitSpoutTuple(streamId, tuple, messageId, taskId); } // Log the report error and also send the stack trace to metrics manager. @@ -137,14 +137,15 @@ List retireExpired(long timeout) { // Following private methods are internal implementations ///////////////////////////////////////////////////////// - private List admitSpoutTuple(String streamId, List tuple, Object messageId) { + private List admitSpoutTuple(String streamId, List tuple, + Object messageId, Integer taskId) { // No need to send tuples if it is already terminated if (getPhysicalPlanHelper().isTerminatedComponent()) { return null; } // Start construct the data tuple - HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple); + HeronTuples.HeronDataTuple.Builder bldr = initTupleBuilder(streamId, tuple, taskId); if (messageId != null) { RootTupleInfo tupleInfo = new RootTupleInfo(streamId, messageId); diff --git a/heron/instance/tests/java/BUILD b/heron/instance/tests/java/BUILD index c7432ae4c03..4cdcf5506ea 100644 --- a/heron/instance/tests/java/BUILD +++ b/heron/instance/tests/java/BUILD @@ -19,6 +19,8 @@ java_library( java_tests( test_classes = [ "com.twitter.heron.grouping.CustomGroupingTest", + "com.twitter.heron.grouping.EmitDirectBoltTest", + "com.twitter.heron.grouping.EmitDirectSpoutTest", "com.twitter.heron.instance.bolt.BoltInstanceTest", "com.twitter.heron.instance.spout.ActivateDeactivateTest", "com.twitter.heron.instance.spout.SpoutInstanceTest", diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectBoltTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectBoltTest.java new file mode 100644 index 00000000000..9b90a6390dd --- /dev/null +++ b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectBoltTest.java @@ -0,0 +1,48 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.grouping; + +import com.twitter.heron.api.topology.TopologyBuilder; +import com.twitter.heron.resource.TestBolt; + +/** + * Tests emit direct of a bolt to a bolt by using a round robin emit direct approach + * from BOLT_A to BOLT_B + */ +public class EmitDirectBoltTest extends AbstractTupleRoutingTest { + + @Override + protected void initBoltA(TopologyBuilder topologyBuilder, + String boltId, String upstreamComponentId) { + topologyBuilder.setBolt(boltId, new EmitDirectRoundRobinBolt(getInitInfoKey(boltId)), 1) + .shuffleGrouping(upstreamComponentId); + } + + @Override + protected void initBoltB(TopologyBuilder topologyBuilder, + String boltId, String upstreamComponentId) { + topologyBuilder.setBolt(boltId, new TestBolt(), 1) + .directGrouping(upstreamComponentId); + } + + @Override + protected Component getComponentToVerify() { + return Component.BOLT_A; + } + + @Override + protected String getExpectedComponentInitInfo() { + return "test-bolt-a+test-bolt-a+default+[2]"; + } +} diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinBolt.java b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinBolt.java new file mode 100644 index 00000000000..8a55f0bff42 --- /dev/null +++ b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinBolt.java @@ -0,0 +1,70 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.grouping; + +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +import com.twitter.heron.api.bolt.OutputCollector; +import com.twitter.heron.api.generated.TopologyAPI; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.api.tuple.Tuple; +import com.twitter.heron.api.tuple.Values; +import com.twitter.heron.common.basics.SingletonRegistry; +import com.twitter.heron.resource.TestBolt; + +/** + * Test spout that used emit direct to emit in a round robin pattern + */ +class EmitDirectRoundRobinBolt extends TestBolt { + private static final Logger LOG = Logger.getLogger(EmitDirectRoundRobinBolt.class.getName()); + private static final long serialVersionUID = 5669629363927216006L; + + private static final int EMIT_COUNT = 10; + + private final String[] toSend = new String[]{"A", "B"}; + private OutputCollector outputCollector; + private int emitted = 0; + + private final String initInfoKey; + + EmitDirectRoundRobinBolt(String initInfoKey) { + super(); + this.initInfoKey = initInfoKey; + } + + @Override + public void prepare(Map map, TopologyContext context, OutputCollector collector) { + this.outputCollector = collector; + String componentId = context.getThisComponentId(); + String streamId = context.getThisStreams().iterator().next(); + Map targets = context.getThisTargets().get(streamId); + + List targetTaskIds = context.getComponentTasks(targets.keySet().iterator().next()); + + ((StringBuilder) SingletonRegistry.INSTANCE.getSingleton(this.initInfoKey)) + .append(String.format("%s+%s+%s+%s", componentId, componentId, streamId, targetTaskIds)); + super.prepare(map, context, outputCollector); + for (emitted = 0; emitted < EMIT_COUNT; emitted++) { + execute(null); + } + } + + @Override + public void execute(Tuple tuple) { + String word = toSend[emitted % toSend.length]; + outputCollector.emitDirect(emitted, new Values(word)); + } +} diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinSpout.java b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinSpout.java new file mode 100644 index 00000000000..b02b5c07b1b --- /dev/null +++ b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectRoundRobinSpout.java @@ -0,0 +1,61 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.grouping; + +import java.util.List; +import java.util.Map; +import java.util.logging.Logger; + +import com.twitter.heron.api.generated.TopologyAPI; +import com.twitter.heron.api.spout.SpoutOutputCollector; +import com.twitter.heron.api.topology.TopologyContext; +import com.twitter.heron.common.basics.SingletonRegistry; +import com.twitter.heron.resource.TestSpout; + +/** + * Test spout that used emit direct to emit in a round robin pattern + */ +class EmitDirectRoundRobinSpout extends TestSpout { + private static final Logger LOG = Logger.getLogger(EmitDirectRoundRobinSpout.class.getName()); + private static final long serialVersionUID = 5669629363927216006L; + + private final String initInfoKey; + + EmitDirectRoundRobinSpout(String initInfoKey) { + super(); + this.initInfoKey = initInfoKey; + } + + @Override + public void open(Map map, + TopologyContext context, + SpoutOutputCollector spoutOutputCollector) { + String componentId = context.getThisComponentId(); + String streamId = context.getThisStreams().iterator().next(); + Map targets = context.getThisTargets().get(streamId); + + List targetTaskIds = context.getComponentTasks(targets.keySet().iterator().next()); + + ((StringBuilder) SingletonRegistry.INSTANCE.getSingleton(this.initInfoKey)) + .append(String.format("%s+%s+%s+%s", componentId, componentId, streamId, targetTaskIds)); + super.open(map, context, spoutOutputCollector); + } + + @Override + protected void emit(SpoutOutputCollector collector, List tuple, + Object messageId, int emittedCount) { + LOG.info(String.format("Emit direct tuple %s to %d", tuple, emittedCount)); + collector.emitDirect(emittedCount, tuple, messageId); + } +} diff --git a/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectSpoutTest.java b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectSpoutTest.java new file mode 100644 index 00000000000..64c94abdefa --- /dev/null +++ b/heron/instance/tests/java/com/twitter/heron/grouping/EmitDirectSpoutTest.java @@ -0,0 +1,46 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.grouping; + +import com.twitter.heron.api.topology.TopologyBuilder; +import com.twitter.heron.resource.TestBolt; + +/** + * Tests emit direct of a spout to a bolt by using a round robin emit direct approach + * from SPOUT to BOLT_A + */ +public class EmitDirectSpoutTest extends AbstractTupleRoutingTest { + + @Override + protected void initSpout(TopologyBuilder topologyBuilder, String spoutId) { + topologyBuilder.setSpout(spoutId, new EmitDirectRoundRobinSpout(getInitInfoKey(spoutId)), 1); + } + + @Override + protected void initBoltA(TopologyBuilder topologyBuilder, + String boltId, String upstreamComponentId) { + topologyBuilder.setBolt(boltId, new TestBolt(), 1) + .directGrouping(upstreamComponentId); + } + + @Override + protected Component getComponentToVerify() { + return Component.SPOUT; + } + + @Override + protected String getExpectedComponentInitInfo() { + return "test-spout+test-spout+default+[1]"; + } +} diff --git a/heron/instance/tests/java/com/twitter/heron/resource/TestSpout.java b/heron/instance/tests/java/com/twitter/heron/resource/TestSpout.java index b532ea80e5f..107f67309f6 100644 --- a/heron/instance/tests/java/com/twitter/heron/resource/TestSpout.java +++ b/heron/instance/tests/java/com/twitter/heron/resource/TestSpout.java @@ -15,6 +15,7 @@ package com.twitter.heron.resource; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -92,11 +93,15 @@ public void nextTuple() { // It will emit A, B, A, B, A, B, A, B, A, B if (emitted < EMIT_COUNT) { String word = toSend[emitted % toSend.length]; - outputCollector.emit(new Values(word), MESSAGE_ID); - emitted++; + emit(outputCollector, new Values(word), MESSAGE_ID, emitted++); } } + protected void emit(SpoutOutputCollector collector, + List tuple, Object messageId, int emittedCount) { + collector.emit(tuple, messageId); + } + @Override public void ack(Object o) { AtomicInteger ackCount = diff --git a/heron/simulator/src/java/StateExample.java b/heron/simulator/src/java/StateExample.java new file mode 100644 index 00000000000..ffb06709260 --- /dev/null +++ b/heron/simulator/src/java/StateExample.java @@ -0,0 +1,74 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class StateExample { + + interface TypedState { + void put(K key, V value); + V get(K key); + Set keySet(); + } + + interface ITypedStatefulComponent { + void initState(TypedState state); + } + + static class MyStateImpl implements TypedState { + private Map map = new HashMap<>(); + + @Override + public void put(Serializable key, Serializable value) { + map.put(key, value); + } + + @Override + public Serializable get(Serializable key) { + return map.get(key); + } + + @Override + public Set keySet() { + return map.keySet(); + } + } + + static class MyStatefulBolt implements ITypedStatefulComponent { + @Override + public void initState(TypedState state) { + for (String key : state.keySet()) { + System.out.printf("key: %s, value %d", key, state.get(key)); + } + } + } + + static TypedState loadStateFromDisk() { + Serializable key = "String"; + Serializable value = 2; + + TypedState state = new MyStateImpl(); + state.put(key, value); + return state; + } + + public static void main(String[] args) { + MyStatefulBolt bolt = new MyStatefulBolt(); + bolt.initState(loadStateFromDisk()); + } +} + diff --git a/heron/stmgr/src/cpp/BUILD b/heron/stmgr/src/cpp/BUILD index cda092d0f4d..7892a70dddb 100644 --- a/heron/stmgr/src/cpp/BUILD +++ b/heron/stmgr/src/cpp/BUILD @@ -5,6 +5,7 @@ cc_library( srcs = [ "grouping/all-grouping.cpp", "grouping/custom-grouping.cpp", + "grouping/direct-grouping.cpp", "grouping/fields-grouping.cpp", "grouping/grouping.cpp", "grouping/lowest-grouping.cpp", @@ -14,6 +15,7 @@ cc_library( "grouping/grouping.h", "grouping/all-grouping.h", "grouping/custom-grouping.h", + "grouping/direct-grouping.h", "grouping/fields-grouping.h", "grouping/lowest-grouping.h", "grouping/shuffle-grouping.h", diff --git a/heron/stmgr/src/cpp/grouping/direct-grouping.cpp b/heron/stmgr/src/cpp/grouping/direct-grouping.cpp new file mode 100644 index 00000000000..557b9509206 --- /dev/null +++ b/heron/stmgr/src/cpp/grouping/direct-grouping.cpp @@ -0,0 +1,44 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "grouping/direct-grouping.h" +#include +#include +#include +#include +#include "grouping/grouping.h" +#include "proto/messages.h" +#include "basics/basics.h" +#include "errors/errors.h" +#include "threads/threads.h" +#include "network/network.h" + + +namespace heron { +namespace stmgr { + +DirectGrouping::DirectGrouping(const std::vector& _task_ids) : Grouping(_task_ids) {} + +DirectGrouping::~DirectGrouping() {} + +void DirectGrouping::GetListToSend(const proto::system::HeronDataTuple&, std::vector&) { + // Stmgr does not do the direct grouping. + // That is done by the instance + return; +} + +} // namespace stmgr +} // namespace heron diff --git a/heron/stmgr/src/cpp/grouping/direct-grouping.h b/heron/stmgr/src/cpp/grouping/direct-grouping.h new file mode 100644 index 00000000000..6da2f67e985 --- /dev/null +++ b/heron/stmgr/src/cpp/grouping/direct-grouping.h @@ -0,0 +1,41 @@ +/* + * Copyright 2015 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SRC_CPP_SVCS_STMGR_SRC_GROUPING_DIRECT_GROUPING_H_ +#define SRC_CPP_SVCS_STMGR_SRC_GROUPING_DIRECT_GROUPING_H_ + +#include +#include +#include "grouping/grouping.h" +#include "proto/messages.h" +#include "basics/basics.h" + +namespace heron { +namespace stmgr { + +class DirectGrouping : public Grouping { + public: + explicit DirectGrouping(const std::vector& _task_ids); + virtual ~DirectGrouping(); + + virtual void GetListToSend(const proto::system::HeronDataTuple& _tuple, + std::vector& _return); +}; + +} // namespace stmgr +} // namespace heron + +#endif // SRC_CPP_SVCS_STMGR_SRC_GROUPING_DIRECT_GROUPING_H_ diff --git a/heron/stmgr/src/cpp/grouping/grouping.cpp b/heron/stmgr/src/cpp/grouping/grouping.cpp index b84f32f7f73..3b30190323d 100644 --- a/heron/stmgr/src/cpp/grouping/grouping.cpp +++ b/heron/stmgr/src/cpp/grouping/grouping.cpp @@ -19,6 +19,7 @@ #include #include #include +#include "grouping/direct-grouping.h" #include "grouping/shuffle-grouping.h" #include "grouping/fields-grouping.h" #include "grouping/all-grouping.h" @@ -68,8 +69,7 @@ Grouping* Grouping::Create(proto::api::Grouping grouping_, const proto::api::Inp } case proto::api::DIRECT: { - LOG(FATAL) << "Direct grouping not supported"; - return NULL; // keep compiler happy + return new DirectGrouping(_task_ids); break; } From 9f787da66ad89ddfa8419dbe1c4609cd6b37d77e Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Thu, 11 May 2017 22:26:07 -0700 Subject: [PATCH 4/6] removing mistaken commit --- heron/simulator/src/java/StateExample.java | 74 ---------------------- 1 file changed, 74 deletions(-) delete mode 100644 heron/simulator/src/java/StateExample.java diff --git a/heron/simulator/src/java/StateExample.java b/heron/simulator/src/java/StateExample.java deleted file mode 100644 index ffb06709260..00000000000 --- a/heron/simulator/src/java/StateExample.java +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 Twitter. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -public class StateExample { - - interface TypedState { - void put(K key, V value); - V get(K key); - Set keySet(); - } - - interface ITypedStatefulComponent { - void initState(TypedState state); - } - - static class MyStateImpl implements TypedState { - private Map map = new HashMap<>(); - - @Override - public void put(Serializable key, Serializable value) { - map.put(key, value); - } - - @Override - public Serializable get(Serializable key) { - return map.get(key); - } - - @Override - public Set keySet() { - return map.keySet(); - } - } - - static class MyStatefulBolt implements ITypedStatefulComponent { - @Override - public void initState(TypedState state) { - for (String key : state.keySet()) { - System.out.printf("key: %s, value %d", key, state.get(key)); - } - } - } - - static TypedState loadStateFromDisk() { - Serializable key = "String"; - Serializable value = 2; - - TypedState state = new MyStateImpl(); - state.put(key, value); - return state; - } - - public static void main(String[] args) { - MyStatefulBolt bolt = new MyStatefulBolt(); - bolt.initState(loadStateFromDisk()); - } -} - From 334262d7c87260022d86f5ad61cdbaa44f8132a7 Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Fri, 12 May 2017 14:44:10 -0700 Subject: [PATCH 5/6] Updating to reflect current status --- .../twitter/heron/instance/HeronInstance.java | 2 +- heron/storm/src/java/BUILD | 2 ++ .../java/org/apache/storm/trident/README.md | 33 ++++++++++--------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java index 8004782b6a4..e5c0fe49f7a 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java +++ b/heron/instance/src/java/com/twitter/heron/instance/HeronInstance.java @@ -159,7 +159,7 @@ public static void main(String[] args) throws IOException { // Init the logging setting and redirect the stdout and stderr to logging // For now we just set the logging level as INFO; later we may accept an argument to set it. - Level loggingLevel = Level.INFO; + Level loggingLevel = Level.FINE; String loggingDir = systemConfig.getHeronLoggingDirectory(); // Log to file and TMaster diff --git a/heron/storm/src/java/BUILD b/heron/storm/src/java/BUILD index 6b076e69397..2efc128604a 100644 --- a/heron/storm/src/java/BUILD +++ b/heron/storm/src/java/BUILD @@ -12,6 +12,8 @@ storm_deps_files = [ "@commons_lang_commons_lang//jar", # added for trident prototype "@org_apache_storm_core//jar", # added for trident prototype "@org_clojure_clojure//jar", # added for trident prototype + "@org_slf4j_slf4j_api//jar", # added for trident prototype + "@org_slf4j_slf4j_jdk14//jar", # added for trident prototype "@org_yaml_snakeyaml//jar", # added for trident prototype "@org_ow2_asm_asm_all//jar", # added for trident prototype, required at runtime "//third_party/java:kryo-neverlink", diff --git a/heron/storm/src/java/org/apache/storm/trident/README.md b/heron/storm/src/java/org/apache/storm/trident/README.md index 647863b1b1c..5999bd0164e 100644 --- a/heron/storm/src/java/org/apache/storm/trident/README.md +++ b/heron/storm/src/java/org/apache/storm/trident/README.md @@ -1,12 +1,13 @@ Hackweek prototype to get a sample trident topology working. -tl;dr; Heron does not support direct grouping, which Trident requires. To get Trident to work, we -need to either support direct grouping in Heron, or refactor Trident to not require it (if that's -possible). +tl;dr; TridentWordCountTopologyHeron runs and tuples are being transmitted. Bolt b-1 receives +sentances and splits them into words and counts. Bolt b-0 does not seem to receive and aggregate +them. We need to understand why. To run: ``` +$ cd path/to/zookeeper && bin/zkServer.sh start $ ~/bin/heron kill local TridentWordCountTopology && rm -rf ~/.herondata/* $ bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client-install.sh --user && \ ~/bin/heron submit local ~/.heron/examples/heron-examples.jar \ @@ -17,26 +18,26 @@ $ less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/co Current status: - Topology compiles and can be submitted - DAG and streams appears to be correctly represented -- Tuple routing is incorrect due to hacks (see below) +- Aggregation does not appear to be happening on the terminal bolt (b-0) - Trident/Storm do not provide reasonable defaults to configs and instances fails violently when expected configs are not set. See TridentWordCountTopology. - Many methods have been added/hacked to get the topology to run, but -- Failures appear in the counters for bolt b-1, but the logs don't show anything and -- Correctness is not right due to the following +- Failures on stream $coord-bg0 appear in the counters for bolt b-1, but the logs don't show anything Issues: -1. Direct grouping needs to be implemented, currently hacking using shuffle grouping (see grouping.cpp) -2. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.emitDirect` not supported and hacked to just emit -3. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.admitBoltTuple` changed to return task ids -4. `BoltDeclarerImpl.grouping(GlobalStreamId id, Grouping grouping)` doesn't support `CUSTOM_SERIALIZED` properly -5. GeneralTopologyContext does a bunch of janky stuff with NoValueMap, for callers who need keySets only -6. Zookeeper acls are not implemented in Utils. +1. `com.twitter.heron.instance.bolt.BoltOutputCollectorImpl.admitBoltTuple` changed to return task ids +2. `BoltDeclarerImpl.grouping(GlobalStreamId id, Grouping grouping)` doesn't support `CUSTOM_SERIALIZED` properly +3. GeneralTopologyContext does a bunch of janky stuff with NoValueMap, for callers who need keySets only +4. Zookeeper acls are not implemented in Utils. TODO: -- Figure out why bolt b-1 is failing. This is likely because tuples are being mis-routed due to the - grouping hacks -- Understand why direct grouping and `emitDirect` are needed and how to support, remove hacks (see #1, #2 above) -- Fix `admitBoltTuple` changed to return task ids to return real tuples ids (see #3 above) +- Figure out why bolt b-1 is failing to process tuples on the $coord-bg0 stream. +- Bolt b-1 seems to receive sentences on stream s1 and split them into words in the code, but they +don't seem to be getting to b-0. Understand why. Are they being emitted and received and he counters +are wrong, or are they not emitted. +- Understand MemoryMapState and see counts getting persisted in it. I suspect this should be done by b-0. +- Understand why direct grouping and `emitDirect` are needed +- Fix `admitBoltTuple` changed to return task ids to return real tuples ids (see #1 above) - Understand why `CUSTOM_SERIALIZED` is needed and how to support (see #4 above) - Figure out why `org.apache.storm.trident.topology.TransactionAttempt` is only registered as `Config.TOPOLOGY_KRYO_REGISTER` in spouts and not bolts. From 7bb5b457becf45741d5f8b6c2b0b6951dbe6693c Mon Sep 17 00:00:00 2001 From: Bill Graham Date: Fri, 21 Jul 2017 16:23:26 -0700 Subject: [PATCH 6/6] Improving logging, adding jvm debug hooks and adding readme notes. --- .../twitter/heron/common/utils/logging/LoggingHelper.java | 1 + heron/executor/src/python/heron_executor.py | 4 +++- heron/storm/src/java/org/apache/storm/trident/README.md | 7 ++++++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java b/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java index 03a76617ff1..eccde5b82b7 100644 --- a/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java +++ b/heron/common/src/java/com/twitter/heron/common/utils/logging/LoggingHelper.java @@ -76,6 +76,7 @@ public static void loggerInit(Level level, boolean isRedirectStdOutErr, String f if (rootLogger.getLevel().intValue() < Level.WARNING.intValue()) { // zookeeper logging scares me. if people want this, we can patch to config-drive this Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARNING); + Logger.getLogger("org.apache.storm.shade.org.apache.zookeeper").setLevel(Level.WARNING); } if (isRedirectStdOutErr) { diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index ddf21c53725..4eab7da9892 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -492,9 +492,11 @@ def _get_java_instance_cmd(self, instance_info): '-XX:+UseConcMarkSweepGC', '-XX:ParallelGCThreads=4', '-Xloggc:log-files/gc.%s.log' % instance_id.replace("$", "")] - if global_task_id == -1: # Used to enable debugging of specific instances + if global_task_id == -1: # Used to enable debugging of specific instances during startup instance_cmd =\ instance_cmd + ["-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"] + instance_cmd = instance_cmd + [ + "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=500%d" % global_task_id] instance_cmd = instance_cmd + self.instance_jvm_opts.split() if component_name in self.component_jvm_opts: diff --git a/heron/storm/src/java/org/apache/storm/trident/README.md b/heron/storm/src/java/org/apache/storm/trident/README.md index 5999bd0164e..ca023e75ab7 100644 --- a/heron/storm/src/java/org/apache/storm/trident/README.md +++ b/heron/storm/src/java/org/apache/storm/trident/README.md @@ -1,7 +1,7 @@ Hackweek prototype to get a sample trident topology working. tl;dr; TridentWordCountTopologyHeron runs and tuples are being transmitted. Bolt b-1 receives -sentances and splits them into words and counts. Bolt b-0 does not seem to receive and aggregate +sentences and splits them into words and counts. Bolt b-0 does not seem to receive and aggregate them. We need to understand why. To run: @@ -15,6 +15,11 @@ $ bazel run --config=darwin --verbose_failures -- scripts/packages/heron-client- $ less ~/.herondata/topologies/local/billg/TridentWordCountTopology/log-files/container_1_b-1_4.log.0 ``` +Notes: +- spout-spout1 emits sentences via FixedBatchSpout on stream s1 to bolt b1 +- b1 receives sentences on SubtopologyBolt and delegates to EachProcessor to Split to output words + to AppendCollector to AggregateProcessor to GroupedAggregator + Current status: - Topology compiles and can be submitted - DAG and streams appears to be correctly represented