Skip to content

Commit

Permalink
Add the ability to simulate streamlet topologies
Browse files Browse the repository at this point in the history
  • Loading branch information
dancollins34 committed Feb 3, 2018
1 parent 56dfd68 commit 1d15f2d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 9 deletions.
7 changes: 4 additions & 3 deletions heron/api/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ java_doc(
api_deps_files = \
heron_java_api_proto_files() + [
":classification",
"//heron/common/src/java:basics-java",
"//heron/common/src/java:basics-java"
]

# Low Level Api
Expand All @@ -34,6 +34,7 @@ java_library(
deps = api_deps_files + [
":api-java-low-level",
"//third_party/java:kryo-neverlink",
"//heron/simulator/src/java:simulator-java"
]
)

Expand All @@ -42,13 +43,13 @@ java_library(
name = "api-java-low-level-functional",
javacopts = DOCLINT_HTML_AND_SYNTAX,
srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
deps = api_deps_files + ["//third_party/java:kryo-neverlink"]
deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"]
)

java_binary(
name = "api-unshaded",
srcs = glob(["com/twitter/heron/api/**/*.java", "com/twitter/heron/streamlet/**/*.java"]),
deps = api_deps_files + ["//third_party/java:kryo-neverlink"],
deps = api_deps_files + ["//third_party/java:kryo-neverlink", "//heron/simulator/src/java:simulator-java"],
)

jarjar_binary(
Expand Down
25 changes: 24 additions & 1 deletion heron/api/src/java/com/twitter/heron/streamlet/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public final class Config implements Serializable {
private final DeliverySemantics deliverySemantics;
private final Serializer serializer;
private com.twitter.heron.api.Config heronConfig;
private final boolean shouldSimulate;
private static final long MB = 1024 * 1024;
private static final long GB = 1024 * MB;


/**
* An enum encapsulating the delivery semantics that can be applied to Heron topologies. The
* options are currently: at most once, at least once, or effectively once.
Expand All @@ -60,6 +62,7 @@ private static class Defaults {
static final long RAM = 100 * MB;
static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE;
static final Serializer SERIALIZER = Serializer.KRYO;
static final boolean shouldSimulate = false;
}

private Config(Builder builder) {
Expand All @@ -68,6 +71,7 @@ private Config(Builder builder) {
cpu = builder.cpu;
ram = builder.ram;
deliverySemantics = builder.deliverySemantics;
shouldSimulate = builder.shouldSimulate;
}

/**
Expand Down Expand Up @@ -147,6 +151,14 @@ public Serializer getSerializer() {
return serializer;
}

/**
* Gets whether this should be simulated
* @return whether this should be simulated
*/
public boolean shouldSimulate() {
return shouldSimulate;
}

private static com.twitter.heron.api.Config.TopologyReliabilityMode translateSemantics(
DeliverySemantics semantics) {
switch (semantics) {
Expand All @@ -167,13 +179,15 @@ public static final class Builder {
private long ram;
private DeliverySemantics deliverySemantics;
private Serializer serializer;
private boolean shouldSimulate;

private Builder() {
config = Defaults.CONFIG;
cpu = Defaults.CPU;
ram = Defaults.RAM;
deliverySemantics = Defaults.SEMANTICS;
serializer = Serializer.KRYO;
serializer = Defaults.SERIALIZER;
shouldSimulate = Defaults.shouldSimulate;
}

/**
Expand Down Expand Up @@ -250,6 +264,15 @@ public Builder setUserConfig(String key, Object value) {
return this;
}

/**
* Sets whether this topology should be run in the simulator or not
* @param shouldSimulate whether this should be run in the simulator
*/
public Builder setShouldSimulate(boolean shouldSimulate) {
this.shouldSimulate = shouldSimulate;
return this;
}

private void useKryo() {
try {
config.setSerializationClassName(KryoSerializer.class.getName());
Expand Down
20 changes: 15 additions & 5 deletions heron/api/src/java/com/twitter/heron/streamlet/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.twitter.heron.api.exception.AlreadyAliveException;
import com.twitter.heron.api.exception.InvalidTopologyException;
import com.twitter.heron.api.topology.TopologyBuilder;
import com.twitter.heron.simulator.Simulator;
import com.twitter.heron.streamlet.impl.BuilderImpl;

/**
Expand All @@ -36,11 +37,20 @@ public Runner() { }
public void run(String name, Config config, Builder builder) {
BuilderImpl bldr = (BuilderImpl) builder;
TopologyBuilder topologyBuilder = bldr.build();
try {
HeronSubmitter.submitTopology(name, config.getHeronConfig(),
topologyBuilder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();

if (config.shouldSimulate()) {
new Simulator().submitTopology(
name,
config.getHeronConfig(),
topologyBuilder.createTopology()
);
} else {
try {
HeronSubmitter.submitTopology(name, config.getHeronConfig(),
topologyBuilder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException e) {
e.printStackTrace();
}
}
}
}

0 comments on commit 1d15f2d

Please sign in to comment.