From a9fdd20018604d0fc75632ad69cd033e0c9ab591 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Thu, 27 Sep 2018 13:34:54 -0700 Subject: [PATCH 01/10] Add custom streamlet operator --- .../org/apache/heron/streamlet/Streamlet.java | 2 + .../heron/streamlet/impl/StreamletImpl.java | 2 + .../impl/operators/CustomOperator.java | 117 ++++++++++++++++++ .../impl/operators/CustomOperatorOutput.java | 88 +++++++++++++ .../impl/operators/ICustomBasicOperator.java | 29 +++++ .../impl/operators/ICustomOperator.java | 30 +++++ 6 files changed, 268 insertions(+) create mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java create mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java create mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java create mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java index b5fe93162fd..ace749b5a0f 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java @@ -22,6 +22,8 @@ import java.util.List; import org.apache.heron.classification.InterfaceStability; +import org.apache.heron.streamlet.impl.operators.ICustomBasicOperator; +import org.apache.heron.streamlet.impl.operators.ICustomOperator; /** * A Streamlet is a (potentially unbounded) ordered collection of tuples. diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index 066dc743fdc..e013012c2fe 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -42,6 +42,8 @@ import org.apache.heron.streamlet.Source; import org.apache.heron.streamlet.Streamlet; import org.apache.heron.streamlet.WindowConfig; +import org.apache.heron.streamlet.impl.operators.ICustomBasicOperator; +import org.apache.heron.streamlet.impl.operators.ICustomOperator; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java new file mode 100644 index 00000000000..27414a77c5b --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.api.tuple.Values; + +/** + * CustomOperator is the base class for all user defined operators. + * Usage: + * 1. Create user defined operator + * class MyOperator extends CustomOperator { + * public MyOperator() { + * ... + * } + * + * @override + * public CustomOperatorOutput CustomOperatorOutput process(String data) { + * ... + * } + * } + * Note that users can override low level bolt functions like getComponentConfiguration() in order + * to implement more advanced features. + * 2. Use it in Streamlet + * .... + * .CustomOperator(new MyOperator) + * .... + */ +public abstract class CustomOperator + extends StreamletOperator + implements ICustomOperator { + + private OutputCollector outputCollector; + + /** + * Process function to be implemented by all custom operators. + * @param data The data object to be processed + * @return a CustomOperatorOutput that contains process results and other flags. The output is wrapped + * in Optional. When the process failed, return none + */ + public abstract Optional> process(R data); + + /** + * Called when a task for this component is initialized within a worker on the cluster. + * It provides the bolt with the environment in which the bolt executes. + * @param heronConf The Heron configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine. + * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. + * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. + */ + public void prepare(Map heronConf, + TopologyContext context, + OutputCollector collector) { + + this.outputCollector = collector; + } + + /** + * Implementation of execute() function to support type safty in Streamlet API + * The function is responsible for: + * - convert incoming tuple to the specified type + * - call the new process() function which has type safty support + * - emit/ack/fail the results + */ + @SuppressWarnings("unchecked") + @Override + public void execute(Tuple tuple) { + R data = (R) tuple.getValue(0); + Optional> results = process(data); + + if (results.isPresent()) { + Collection anchors = results.get().isAnchored() ? Arrays.asList(tuple) : null; + emitResults(results.get().getData(), anchors); + outputCollector.ack(tuple); + } else { + outputCollector.fail(tuple); + } + } + + /** + * Convert process results to tuples and emit out to the downstream + * @param data results collections with corresponding stream id + * @param anchors anchors to be used when emitting tuples + */ + private void emitResults(Map> data, Collection anchors) { + for (Map.Entry> entry : data.entrySet()) { + String streamId = entry.getKey(); + for (T value: entry.getValue()) { + outputCollector.emit(streamId, anchors, new Values(value)); + } + } + } +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java new file mode 100644 index 00000000000..40f87afbd61 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.heron.api.utils.Utils; + + + +/** + * CustomOperatorOutput is the class for data returned by CustomOperators' process() function. + * CustomStreamlet is responsible for converting the output to emit/ack/fail is lowlevel API. + * + * Usage: + * Assuming data is already stored in an integer array: data, + * return CustomOperatorOutput.apply(data); + */ +public final class CustomOperatorOutput { + private boolean anchored; // If anchors should be added when emitting tuples? + private Map> data; // Output data to be emitted. + + // Disable constructor. User should use the static functions below to create objects + private CustomOperatorOutput() { + } + + public Map> getData() { + return data; + } + + public boolean isAnchored() { + return anchored; + } + + /* Util static functions. Users should use them to create objects */ + public static Optional> success() { + Map> dataMap = new HashMap>(); + return success(dataMap, true); + } + + public static Optional> success(Collection data) { + return success(data, true); + } + + public static Optional> success(Collection data, + boolean anchored) { + Map> dataMap = new HashMap>(); + dataMap.put(Utils.DEFAULT_STREAM_ID, data); + return success(dataMap, anchored); + } + + public static Optional> success(Map> data) { + return success(data, true); + } + + public static Optional> success(Map> data, + boolean anchored) { + CustomOperatorOutput retval = new CustomOperatorOutput(); + retval.data = data; + retval.anchored = anchored; + return Optional.of(retval); + } + + public static Optional> fail() { + return Optional.empty(); + } +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java new file mode 100644 index 00000000000..22647030501 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import org.apache.heron.api.bolt.IBasicBolt; + +/** + * The interface for custom basic operators. It is used for existing bolts (subclasses of IBasicBolt). + */ +public interface ICustomBasicOperator extends IBasicBolt { +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java new file mode 100644 index 00000000000..4bf6acc93a0 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import org.apache.heron.api.bolt.IRichBolt; + +/** + * The interface for custom operators: including new user defined operators as well as + * operators based on existing Bolts (subclasses of IRichBolt). + */ +public interface ICustomOperator extends IRichBolt { +} From 8844056fec7a4c7b2c7ca4f1d3e3670029b91135 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Tue, 16 Oct 2018 09:26:58 -0700 Subject: [PATCH 02/10] Refactor ICustomOperator to IStreamletOperator --- heron/api/src/java/org/apache/heron/streamlet/Streamlet.java | 4 ++-- .../java/org/apache/heron/streamlet/impl/StreamletImpl.java | 5 ++--- .../heron/streamlet/impl/operators/CustomOperator.java | 4 +--- ...CustomBasicOperator.java => IStreamletBasicOperator.java} | 2 +- .../{ICustomOperator.java => IStreamletOperator.java} | 2 +- .../heron/streamlet/impl/streamlets/CustomStreamlet.java | 1 + .../apache/heron/streamlet/impl/streamlets/MapStreamlet.java | 4 ++++ 7 files changed, 12 insertions(+), 10 deletions(-) rename heron/api/src/java/org/apache/heron/streamlet/impl/operators/{ICustomBasicOperator.java => IStreamletBasicOperator.java} (93%) rename heron/api/src/java/org/apache/heron/streamlet/impl/operators/{ICustomOperator.java => IStreamletOperator.java} (94%) diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java index ace749b5a0f..26bd824f7d1 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java @@ -22,8 +22,8 @@ import java.util.List; import org.apache.heron.classification.InterfaceStability; -import org.apache.heron.streamlet.impl.operators.ICustomBasicOperator; -import org.apache.heron.streamlet.impl.operators.ICustomOperator; +import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; +import org.apache.heron.streamlet.impl.operators.IStreamletOperator; /** * A Streamlet is a (potentially unbounded) ordered collection of tuples. diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index e013012c2fe..f5604d9ed25 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -42,8 +42,8 @@ import org.apache.heron.streamlet.Source; import org.apache.heron.streamlet.Streamlet; import org.apache.heron.streamlet.WindowConfig; -import org.apache.heron.streamlet.impl.operators.ICustomBasicOperator; -import org.apache.heron.streamlet.impl.operators.ICustomOperator; +import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; +import org.apache.heron.streamlet.impl.operators.IStreamletOperator; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; @@ -506,5 +506,4 @@ public Streamlet applyOperator(IStreamletOperator operator) { addChild(customStreamlet); return customStreamlet; } - } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java index 27414a77c5b..37c2a5b2df2 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -51,9 +51,7 @@ * .CustomOperator(new MyOperator) * .... */ -public abstract class CustomOperator - extends StreamletOperator - implements ICustomOperator { +public abstract class CustomOperator extends StreamletOperator { private OutputCollector outputCollector; diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java similarity index 93% rename from heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java rename to heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java index 22647030501..3245178403d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomBasicOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java @@ -25,5 +25,5 @@ /** * The interface for custom basic operators. It is used for existing bolts (subclasses of IBasicBolt). */ -public interface ICustomBasicOperator extends IBasicBolt { +public interface IStreamletBasicOperator extends IBasicBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java similarity index 94% rename from heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java rename to heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java index 4bf6acc93a0..922fb09d48d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/ICustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java @@ -26,5 +26,5 @@ * The interface for custom operators: including new user defined operators as well as * operators based on existing Bolts (subclasses of IRichBolt). */ -public interface ICustomOperator extends IRichBolt { +public interface IStreamletOperator extends IRichBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java index b0f71c0050d..56f4a4ec314 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java @@ -28,6 +28,7 @@ import org.apache.heron.streamlet.IStreamletRichOperator; import org.apache.heron.streamlet.IStreamletWindowOperator; import org.apache.heron.streamlet.impl.StreamletImpl; +import org.apache.heron.streamlet.impl.operators.IStreamletOperator; /** * CustomStreamlet represents a Streamlet that is made up of applying the user diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java index 96c34b9edff..365febe7b36 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java @@ -25,7 +25,11 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.SerializableFunction; import org.apache.heron.streamlet.impl.StreamletImpl; +<<<<<<< a9fdd20018604d0fc75632ad69cd033e0c9ab591:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java import org.apache.heron.streamlet.impl.operators.MapOperator; +======= +import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; +>>>>>>> Refactor ICustomOperator to IStreamletOperator:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java /** * MapStreamlet represents a Streamlet that is made up of applying the user From 8724dac4a5a9e184e91d6ca9390dd205560340b0 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Tue, 16 Oct 2018 09:45:24 -0700 Subject: [PATCH 03/10] Add unit tests --- .../impl/operators/CustomOperator.java | 4 +- .../impl/operators/CustomOperatorOutput.java | 103 ++++++++++--- heron/api/tests/java/BUILD | 1 + .../heron/resource/TestCustomOperator.java | 44 ++++++ .../streamlet/impl/StreamletImplTest.java | 32 ++++ .../operators/CustomOperatorOutputTest.java | 43 ++++++ .../impl/operators/CustomOperatorTest.java | 138 ++++++++++++++++++ 7 files changed, 344 insertions(+), 21 deletions(-) create mode 100644 heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java create mode 100644 heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java create mode 100644 heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java index 37c2a5b2df2..903bfd44886 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -40,7 +40,7 @@ * } * * @override - * public CustomOperatorOutput CustomOperatorOutput process(String data) { + * public Optional> CustomOperatorOutput process(String data) { * ... * } * } @@ -48,7 +48,7 @@ * to implement more advanced features. * 2. Use it in Streamlet * .... - * .CustomOperator(new MyOperator) + * .perform(new MyOperator()) * .... */ public abstract class CustomOperator extends StreamletOperator { diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java index 40f87afbd61..f024958a671 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java @@ -20,6 +20,7 @@ package org.apache.heron.streamlet.impl.operators; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -39,49 +40,113 @@ */ public final class CustomOperatorOutput { private boolean anchored; // If anchors should be added when emitting tuples? - private Map> data; // Output data to be emitted. + private Map> output; // Stream id to output data to be emitted. // Disable constructor. User should use the static functions below to create objects - private CustomOperatorOutput() { + private CustomOperatorOutput(Map> data) { + this.anchored = true; + this.output = data; } + /** + * Get collected data + * @return data to be emitted. The data is a map of stream id to collection of objects + */ public Map> getData() { - return data; + return output; } + /** + * Check anchored flag + * @return true if the output data needs to be anchored when emitting + */ public boolean isAnchored() { return anchored; } + /** + * Append a collection of data to the default output stream + * @param newData collection of data to append + * @return this object itself + */ + public CustomOperatorOutput append(Collection newData) { + return append(Utils.DEFAULT_STREAM_ID, newData); + } + + /** + * Append a collection of data to the specified output stream + * @param streamId the name of the output stream + * @param newData collection of data to append + * @return this object itself + */ + public CustomOperatorOutput append(String streamId, Collection newData) { + if (output.containsKey(streamId)) { + Collection collection = output.get(streamId); + collection.addAll(newData); + output.put(streamId, newData); + } else { + output.put(streamId, newData); + } + + return this; + } + + /** + * If the output data needs to be anchored or not + * @param flag the anchored flag + * @return this object itself + */ + public CustomOperatorOutput withAnchor(boolean flag) { + anchored = flag; + return this; + } + /* Util static functions. Users should use them to create objects */ - public static Optional> success() { + + /** + * Generate a CustomOperatorOutput object with empty output + * @return a CustomOperatorOutput object with empty output + */ + public static Optional> succeed() { Map> dataMap = new HashMap>(); - return success(dataMap, true); + return succeed(dataMap); } - public static Optional> success(Collection data) { - return success(data, true); + /** + * Generate a CustomOperatorOutput object with a single object in the default stream + * @param data the data object to be added + * @return the generated CustomOperatorOutput object + */ + public static Optional> succeed(R data) { + return succeed(Arrays.asList(data)); } - public static Optional> success(Collection data, - boolean anchored) { + /** + * Generate a CustomOperatorOutput object with a collection of output objects + * in the default stream + * @param data the collection of data to be added + * @return the generated CustomOperatorOutput object + */ + public static Optional> succeed(Collection data) { Map> dataMap = new HashMap>(); dataMap.put(Utils.DEFAULT_STREAM_ID, data); - return success(dataMap, anchored); - } - - public static Optional> success(Map> data) { - return success(data, true); + return succeed(dataMap); } - public static Optional> success(Map> data, - boolean anchored) { - CustomOperatorOutput retval = new CustomOperatorOutput(); - retval.data = data; - retval.anchored = anchored; + /** + * Generate a CustomOperatorOutput object with a map of stream id to collection of output objects + * @param data the output data in a map of stream id to collection of output objects + * @return the generated CustomOperatorOutput object + */ + public static Optional> succeed(Map> data) { + CustomOperatorOutput retval = new CustomOperatorOutput(data); return Optional.of(retval); } + /** + * Generate a result that represents a fail result (Optional.empty) + * @return a result that represents a fail result (Optional.empty) + */ public static Optional> fail() { return Optional.empty(); } diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD index 03c1f2bb5e5..37499fde290 100644 --- a/heron/api/tests/java/BUILD +++ b/heron/api/tests/java/BUILD @@ -28,6 +28,7 @@ java_tests( "org.apache.heron.api.metric.LatencyStatAndMetricTest", "org.apache.heron.api.bolt.BaseWindowedBoltTest", "org.apache.heron.streamlet.impl.StreamletImplTest", + "org.apache.heron.streamlet.impl.operators.CustomOperatorTest", "org.apache.heron.streamlet.impl.operators.JoinOperatorTest", "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest", "org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest", diff --git a/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java new file mode 100644 index 00000000000..b863b4262fe --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.resource; + +import java.util.Optional; + +import org.apache.heron.streamlet.impl.operators.CustomOperator; +import org.apache.heron.streamlet.impl.operators.CustomOperatorOutput; + +public class TestCustomOperator extends CustomOperator { + @Override + public Optional> process(T data) { + // Success if data is positive + if (data.intValue() > 0) { + if (data.intValue() <= 100) { + // Emit to next component if data is <= 100 + return CustomOperatorOutput.succeed(data); + } else { + // Ignore data if it is greater than 100 + return CustomOperatorOutput.succeed(); + } + } else { + // Error if it is 0 or negative + return CustomOperatorOutput.fail(); + } + } +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index 8a0b22d91c3..117027fc58f 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -33,6 +33,7 @@ import org.apache.heron.resource.TestBasicBolt; import org.apache.heron.resource.TestBolt; import org.apache.heron.resource.TestWindowBolt; +import org.apache.heron.resource.TestCustomOperator; import org.apache.heron.streamlet.Config; import org.apache.heron.streamlet.Context; import org.apache.heron.streamlet.IStreamletBasicOperator; @@ -42,6 +43,7 @@ import org.apache.heron.streamlet.SerializableTransformer; import org.apache.heron.streamlet.Streamlet; import org.apache.heron.streamlet.WindowConfig; +import org.apache.heron.streamlet.impl.operators.IStreamletOperator; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; @@ -250,6 +252,36 @@ public void testCustomStreamletFromWindowBolt() throws Exception { assertEquals(supplierStreamlet.getChildren().get(0), streamlet); } + @Test + @SuppressWarnings("unchecked") + public void testCustomStreamlet() throws Exception { + Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); + Streamlet streamlet = baseStreamlet.setNumPartitions(20) + .perform(new TestCustomOperator()); + assertTrue(streamlet instanceof CustomStreamlet); + CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; + assertEquals(20, mStreamlet.getNumPartitions()); + SupplierStreamlet supplierStreamlet = (SupplierStreamlet) baseStreamlet; + assertEquals(supplierStreamlet.getChildren().size(), 1); + assertEquals(supplierStreamlet.getChildren().get(0), streamlet); + } + + private class MyBoltOperator extends TestBolt implements IStreamletOperator { + } + + @Test + @SuppressWarnings("unchecked") + public void testCustomStreamletFromBolt() throws Exception { + Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); + Streamlet streamlet = baseStreamlet.setNumPartitions(20).perform(new MyBoltOperator()); + assertTrue(streamlet instanceof CustomStreamlet); + CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; + assertEquals(20, mStreamlet.getNumPartitions()); + SupplierStreamlet supplierStreamlet = (SupplierStreamlet) baseStreamlet; + assertEquals(supplierStreamlet.getChildren().size(), 1); + assertEquals(supplierStreamlet.getChildren().get(0), streamlet); + } + @Test @SuppressWarnings("unchecked") public void testSimpleBuild() throws Exception { diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java new file mode 100644 index 00000000000..4261c3fc2e6 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Optional; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CustomOperatorOutputTest { + @Test + public void testOutputSucceed() { + Optional> output = CustomOperatorOutput.succeed(); + assertTrue(output.isPresent()); + assertTrue(output.get().getData().isEmpty()); + assertTrue(output.get().isAnchored()); + } + + @Test + public void testOutputFail() { + Optional> output = CustomOperatorOutput.fail(); + assertFalse(output.isPresent()); + } +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java new file mode 100644 index 00000000000..fd28d8bf173 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.HashMap; + +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +import org.apache.heron.api.Config; +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.generated.TopologyAPI; +import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Fields; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.api.tuple.Values; +import org.apache.heron.common.utils.topology.TopologyContextImpl; +import org.apache.heron.common.utils.tuple.TupleImpl; +import org.apache.heron.resource.TestCustomOperator; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class CustomOperatorTest { + private TestCustomOperator testOperator; + + private Tuple getTuple(String stream, final Fields fields, Values values) { + TopologyAPI.StreamId streamId + = TopologyAPI.StreamId.newBuilder() + .setComponentName("custom1").setId(stream).build(); + TopologyContext topologyContext = getContext(fields); + return new TupleImpl(topologyContext, streamId, 0, + null, values, 1) { + @Override + public TopologyAPI.StreamId getSourceGlobalStreamId() { + return TopologyAPI.StreamId.newBuilder().setComponentName("sourceComponent") + .setId("default").build(); + } + }; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private TopologyContext getContext(final Fields fields) { + TopologyBuilder builder = new TopologyBuilder(); + return new TopologyContextImpl(new Config(), + builder.createTopology() + .setConfig(new Config()) + .setName("test") + .setState(TopologyAPI.TopologyState.RUNNING) + .getTopology(), + new HashMap(), 1, null) { + @Override + public Fields getComponentOutputFields( + String componentId, String streamId) { + return fields; + } + }; + } + + @Before + public void setUp() { + testOperator = new TestCustomOperator(); + } + + @Test + public void testOutputTuples() { + OutputCollector collector = PowerMockito.mock(OutputCollector.class); + testOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class), collector); + + Tuple[] tuples = { + getTuple("default", new Fields("a"), new Values(0)), // to fail + getTuple("default", new Fields("a"), new Values(1)), + getTuple("default", new Fields("a"), new Values(2)), + getTuple("default", new Fields("a"), new Values(3)), + getTuple("default", new Fields("a"), new Values(4)), + getTuple("default", new Fields("a"), new Values(5)), + getTuple("default", new Fields("a"), new Values(101)), // to skip + }; + + for (Tuple tuple: tuples) { + testOperator.execute(tuple); + } + + verify(collector, times(0)).emit(any(), eq(Arrays.asList(tuples[0])), any()); + verify(collector, times(0)).emit(any(), eq(Arrays.asList(tuples[6])), any()); + + verify(collector, times(1)).emit("default", Arrays.asList(tuples[1]), Arrays.asList(1)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[2]), Arrays.asList(2)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[3]), Arrays.asList(3)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[4]), Arrays.asList(4)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[5]), Arrays.asList(5)); + } + + @Test + public void testAckAndFail() { + OutputCollector collector = PowerMockito.mock(OutputCollector.class); + testOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class), collector); + + Tuple[] tuples = { + getTuple("default", new Fields("a"), new Values(0)), // tuple to fail + getTuple("default", new Fields("a"), new Values(1)), // tuples to process + getTuple("default", new Fields("a"), new Values(101)) // tuple to skip but still ack + }; + + for (Tuple tuple: tuples) { + testOperator.execute(tuple); + } + + verify(collector, times(0)).ack(tuples[0]); + verify(collector, times(1)).fail(tuples[0]); + verify(collector, times(1)).ack(tuples[1]); + verify(collector, times(0)).fail(tuples[1]); + verify(collector, times(1)).ack(tuples[2]); + verify(collector, times(0)).fail(tuples[2]); + } +} From 91d465b9f160581bcd3715384a659a7bb88b641d Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Wed, 17 Oct 2018 16:26:02 -0700 Subject: [PATCH 04/10] clean up --- .../impl/operators/CustomOperator.java | 24 +++-- .../impl/operators/CustomOperatorOutput.java | 92 ++++++++----------- heron/api/tests/java/BUILD | 1 + .../heron/resource/TestCustomOperator.java | 8 +- .../operators/CustomOperatorOutputTest.java | 82 +++++++++++++++-- 5 files changed, 129 insertions(+), 78 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java index 903bfd44886..96283e0caa5 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -21,9 +21,8 @@ package org.apache.heron.streamlet.impl.operators; import java.util.Arrays; -import java.util.Collection; +import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.heron.api.bolt.OutputCollector; import org.apache.heron.api.topology.TopologyContext; @@ -40,7 +39,7 @@ * } * * @override - * public Optional> CustomOperatorOutput process(String data) { + * public CustomOperatorOutput CustomOperatorOutput process(String data) { * ... * } * } @@ -58,10 +57,9 @@ public abstract class CustomOperator extends StreamletOperator { /** * Process function to be implemented by all custom operators. * @param data The data object to be processed - * @return a CustomOperatorOutput that contains process results and other flags. The output is wrapped - * in Optional. When the process failed, return none + * @return a CustomOperatorOutput that contains process results and other flags. */ - public abstract Optional> process(R data); + public abstract CustomOperatorOutput process(R data); /** * Called when a task for this component is initialized within a worker on the cluster. @@ -88,11 +86,11 @@ public void prepare(Map heronConf, @Override public void execute(Tuple tuple) { R data = (R) tuple.getValue(0); - Optional> results = process(data); + CustomOperatorOutput results = process(data); - if (results.isPresent()) { - Collection anchors = results.get().isAnchored() ? Arrays.asList(tuple) : null; - emitResults(results.get().getData(), anchors); + if (results.isSuccessful()) { + List anchors = results.isAnchored() ? Arrays.asList(tuple) : null; + emitResults(results.getData(), anchors); outputCollector.ack(tuple); } else { outputCollector.fail(tuple); @@ -101,11 +99,11 @@ public void execute(Tuple tuple) { /** * Convert process results to tuples and emit out to the downstream - * @param data results collections with corresponding stream id + * @param data results lists with corresponding stream id * @param anchors anchors to be used when emitting tuples */ - private void emitResults(Map> data, Collection anchors) { - for (Map.Entry> entry : data.entrySet()) { + private void emitResults(Map> data, List anchors) { + for (Map.Entry> entry : data.entrySet()) { String streamId = entry.getKey(); for (T value: entry.getValue()) { outputCollector.emit(streamId, anchors, new Values(value)); diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java index f024958a671..29fdedd279e 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java @@ -21,15 +21,12 @@ package org.apache.heron.streamlet.impl.operators; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.heron.api.utils.Utils; - - /** * CustomOperatorOutput is the class for data returned by CustomOperators' process() function. * CustomStreamlet is responsible for converting the output to emit/ack/fail is lowlevel API. @@ -39,56 +36,40 @@ * return CustomOperatorOutput.apply(data); */ public final class CustomOperatorOutput { - private boolean anchored; // If anchors should be added when emitting tuples? - private Map> output; // Stream id to output data to be emitted. + private Map> output; // Stream id to output data to be emitted. + private boolean successful; // If the execution succeeded? + private boolean anchored; // If anchors should be added when emitting tuples? // Disable constructor. User should use the static functions below to create objects - private CustomOperatorOutput(Map> data) { - this.anchored = true; + private CustomOperatorOutput(Map> data) { this.output = data; + this.successful = data != null; + this.anchored = true; } /** * Get collected data - * @return data to be emitted. The data is a map of stream id to collection of objects + * @return data to be emitted. The data is a map of stream id to list of objects */ - public Map> getData() { + public Map> getData() { return output; } /** - * Check anchored flag - * @return true if the output data needs to be anchored when emitting + * Check successful flag + * @return true if the execution succeeded. If not successful, fail() will be called + * instead of ack() in bolt */ - public boolean isAnchored() { - return anchored; + public boolean isSuccessful() { + return successful; } /** - * Append a collection of data to the default output stream - * @param newData collection of data to append - * @return this object itself - */ - public CustomOperatorOutput append(Collection newData) { - return append(Utils.DEFAULT_STREAM_ID, newData); - } - - /** - * Append a collection of data to the specified output stream - * @param streamId the name of the output stream - * @param newData collection of data to append - * @return this object itself + * Check anchored flag + * @return true if the output data needs to be anchored when emitting */ - public CustomOperatorOutput append(String streamId, Collection newData) { - if (output.containsKey(streamId)) { - Collection collection = output.get(streamId); - collection.addAll(newData); - output.put(streamId, newData); - } else { - output.put(streamId, newData); - } - - return this; + public boolean isAnchored() { + return anchored; } /** @@ -96,19 +77,23 @@ public CustomOperatorOutput append(String streamId, Collection newData) { * @param flag the anchored flag * @return this object itself */ - public CustomOperatorOutput withAnchor(boolean flag) { + public CustomOperatorOutput withAnchor(boolean flag) { anchored = flag; return this; } /* Util static functions. Users should use them to create objects */ + public static CustomOperatorOutput create() { + Map> dataMap = new HashMap>(); + return new CustomOperatorOutput(dataMap); + } /** * Generate a CustomOperatorOutput object with empty output * @return a CustomOperatorOutput object with empty output */ - public static Optional> succeed() { - Map> dataMap = new HashMap>(); + public static CustomOperatorOutput succeed() { + Map> dataMap = new HashMap>(); return succeed(dataMap); } @@ -117,37 +102,38 @@ public static Optional> succeed() { * @param data the data object to be added * @return the generated CustomOperatorOutput object */ - public static Optional> succeed(R data) { + public static CustomOperatorOutput succeed(R data) { return succeed(Arrays.asList(data)); } /** - * Generate a CustomOperatorOutput object with a collection of output objects + * Generate a CustomOperatorOutput object with a list of output objects * in the default stream - * @param data the collection of data to be added + * @param data the list of data to be added * @return the generated CustomOperatorOutput object */ - public static Optional> succeed(Collection data) { - Map> dataMap = new HashMap>(); + public static CustomOperatorOutput succeed(List data) { + Map> dataMap = new HashMap>(); dataMap.put(Utils.DEFAULT_STREAM_ID, data); return succeed(dataMap); } /** - * Generate a CustomOperatorOutput object with a map of stream id to collection of output objects - * @param data the output data in a map of stream id to collection of output objects + * Generate a CustomOperatorOutput object with a map of stream id to list of output objects + * @param data the output data in a map of stream id to list of output objects * @return the generated CustomOperatorOutput object */ - public static Optional> succeed(Map> data) { + public static CustomOperatorOutput succeed(Map> data) { CustomOperatorOutput retval = new CustomOperatorOutput(data); - return Optional.of(retval); + return retval; } /** - * Generate a result that represents a fail result (Optional.empty) - * @return a result that represents a fail result (Optional.empty) + * Generate a result that represents a fail result + * @return a result that represents a fail result */ - public static Optional> fail() { - return Optional.empty(); + public static CustomOperatorOutput fail() { + CustomOperatorOutput failed = new CustomOperatorOutput(null); + return failed; } } diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD index 37499fde290..95805bda2da 100644 --- a/heron/api/tests/java/BUILD +++ b/heron/api/tests/java/BUILD @@ -28,6 +28,7 @@ java_tests( "org.apache.heron.api.metric.LatencyStatAndMetricTest", "org.apache.heron.api.bolt.BaseWindowedBoltTest", "org.apache.heron.streamlet.impl.StreamletImplTest", + "org.apache.heron.streamlet.impl.operators.CustomOperatorOutputTest", "org.apache.heron.streamlet.impl.operators.CustomOperatorTest", "org.apache.heron.streamlet.impl.operators.JoinOperatorTest", "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest", diff --git a/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java index b863b4262fe..395ebd64ab8 100644 --- a/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java +++ b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java @@ -19,14 +19,12 @@ package org.apache.heron.resource; -import java.util.Optional; - import org.apache.heron.streamlet.impl.operators.CustomOperator; import org.apache.heron.streamlet.impl.operators.CustomOperatorOutput; public class TestCustomOperator extends CustomOperator { @Override - public Optional> process(T data) { + public CustomOperatorOutput process(T data) { // Success if data is positive if (data.intValue() > 0) { if (data.intValue() <= 100) { @@ -34,11 +32,11 @@ public Optional> process(T data) { return CustomOperatorOutput.succeed(data); } else { // Ignore data if it is greater than 100 - return CustomOperatorOutput.succeed(); + return CustomOperatorOutput.succeed(); } } else { // Error if it is 0 or negative - return CustomOperatorOutput.fail(); + return CustomOperatorOutput.fail(); } } } diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java index 4261c3fc2e6..e25ae75e059 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java @@ -19,25 +19,93 @@ package org.apache.heron.streamlet.impl.operators; -import java.util.Optional; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import org.junit.Test; +import org.apache.heron.api.utils.Utils; + +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class CustomOperatorOutputTest { @Test public void testOutputSucceed() { - Optional> output = CustomOperatorOutput.succeed(); - assertTrue(output.isPresent()); - assertTrue(output.get().getData().isEmpty()); - assertTrue(output.get().isAnchored()); + CustomOperatorOutput output = CustomOperatorOutput.succeed(); + assertTrue(output.isSuccessful()); + assertTrue(output.getData().isEmpty()); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithObject() { + CustomOperatorOutput output = CustomOperatorOutput.succeed(100); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 1); + assertTrue(output.getData().containsKey(Utils.DEFAULT_STREAM_ID)); + + List data = output.getData().get(Utils.DEFAULT_STREAM_ID); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithList() { + CustomOperatorOutput output = + CustomOperatorOutput.succeed(Arrays.asList(100)); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 1); + assertTrue(output.getData().containsKey(Utils.DEFAULT_STREAM_ID)); + + List data = output.getData().get(Utils.DEFAULT_STREAM_ID); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithMap() { + HashMap> map = new HashMap>(); + List list1 = Arrays.asList(100); + List list2 = Arrays.asList(200); + map.put("stream1", list1); + map.put("stream2", list2); + + CustomOperatorOutput output = CustomOperatorOutput.succeed(map); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 2); + assertTrue(output.getData().containsKey("stream1")); + assertTrue(output.getData().containsKey("stream2")); + + List data = output.getData().get("stream1"); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + + data = output.getData().get("stream2"); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 200); + assertTrue(output.isAnchored()); } @Test public void testOutputFail() { - Optional> output = CustomOperatorOutput.fail(); - assertFalse(output.isPresent()); + CustomOperatorOutput output = CustomOperatorOutput.fail(); + assertFalse(output.isSuccessful()); + } + + @Test + public void testOutputWithAnchor() { + CustomOperatorOutput anchored = + CustomOperatorOutput.create().withAnchor(true); + assertTrue(anchored.isAnchored()); + + CustomOperatorOutput unanchored = + CustomOperatorOutput.create().withAnchor(false); + assertFalse(unanchored.isAnchored()); } } From 3ff43e6062a9464ea2b9f122936d895a10af27c7 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Thu, 18 Oct 2018 11:23:35 -0700 Subject: [PATCH 05/10] Move IStreamletOperator and update comments --- .../streamlet/IStreamletBasicOperator.java | 4 +-- .../org/apache/heron/streamlet/Streamlet.java | 2 -- .../heron/streamlet/impl/StreamletImpl.java | 2 -- .../operators/IStreamletBasicOperator.java | 29 ------------------ .../impl/operators/IStreamletOperator.java | 30 ------------------- .../impl/streamlets/CustomStreamlet.java | 1 - .../impl/streamlets/LogStreamlet.java | 3 ++ .../streamlet/impl/StreamletImplTest.java | 1 - 8 files changed, 5 insertions(+), 67 deletions(-) delete mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java delete mode 100644 heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java index 98b87f355c2..c0f5d06bffd 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java @@ -23,8 +23,8 @@ import org.apache.heron.api.bolt.IBasicBolt; /** - * The interface for streamlet operators. It can be used to create - * operators based on existing Bolts (subclasses of IBasicBolt). + * The interface for streamlet basic operators. It is used to support existing user bolts + * extended from IBasicBolt only. It shouldn't be used to create streamlet operators. */ public interface IStreamletBasicOperator extends IStreamletOperator, IBasicBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java index 26bd824f7d1..b5fe93162fd 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/Streamlet.java @@ -22,8 +22,6 @@ import java.util.List; import org.apache.heron.classification.InterfaceStability; -import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; -import org.apache.heron.streamlet.impl.operators.IStreamletOperator; /** * A Streamlet is a (potentially unbounded) ordered collection of tuples. diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index f5604d9ed25..ace4bb681fc 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -42,8 +42,6 @@ import org.apache.heron.streamlet.Source; import org.apache.heron.streamlet.Streamlet; import org.apache.heron.streamlet.WindowConfig; -import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; -import org.apache.heron.streamlet.impl.operators.IStreamletOperator; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java deleted file mode 100644 index 3245178403d..00000000000 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletBasicOperator.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.heron.streamlet.impl.operators; - -import org.apache.heron.api.bolt.IBasicBolt; - -/** - * The interface for custom basic operators. It is used for existing bolts (subclasses of IBasicBolt). - */ -public interface IStreamletBasicOperator extends IBasicBolt { -} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java deleted file mode 100644 index 922fb09d48d..00000000000 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/IStreamletOperator.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.heron.streamlet.impl.operators; - -import org.apache.heron.api.bolt.IRichBolt; - -/** - * The interface for custom operators: including new user defined operators as well as - * operators based on existing Bolts (subclasses of IRichBolt). - */ -public interface IStreamletOperator extends IRichBolt { -} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java index 56f4a4ec314..b0f71c0050d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomStreamlet.java @@ -28,7 +28,6 @@ import org.apache.heron.streamlet.IStreamletRichOperator; import org.apache.heron.streamlet.IStreamletWindowOperator; import org.apache.heron.streamlet.impl.StreamletImpl; -import org.apache.heron.streamlet.impl.operators.IStreamletOperator; /** * CustomStreamlet represents a Streamlet that is made up of applying the user diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java index 707751f5dad..9dd70426ee6 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java @@ -24,7 +24,10 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.impl.StreamletImpl; +<<<<<<< 91d465b9f160581bcd3715384a659a7bb88b641d:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java import org.apache.heron.streamlet.impl.sinks.LogSink; +======= +>>>>>>> Move IStreamletOperator and update comments:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java /** * LogStreamlet represents en empty Streamlet that is made up of elements from the parent diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index 117027fc58f..12193d02adf 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -43,7 +43,6 @@ import org.apache.heron.streamlet.SerializableTransformer; import org.apache.heron.streamlet.Streamlet; import org.apache.heron.streamlet.WindowConfig; -import org.apache.heron.streamlet.impl.operators.IStreamletOperator; import org.apache.heron.streamlet.impl.streamlets.ConsumerStreamlet; import org.apache.heron.streamlet.impl.streamlets.CustomStreamlet; import org.apache.heron.streamlet.impl.streamlets.FilterStreamlet; From 3598935c369f2c0de4832c6c1a3e4db2219edcd4 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Fri, 19 Oct 2018 08:17:07 -0700 Subject: [PATCH 06/10] Rename perform() to applyOperator() --- .../org/apache/heron/streamlet/impl/StreamletImplTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index 12193d02adf..b9ae29d2d75 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -256,7 +256,7 @@ public void testCustomStreamletFromWindowBolt() throws Exception { public void testCustomStreamlet() throws Exception { Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); Streamlet streamlet = baseStreamlet.setNumPartitions(20) - .perform(new TestCustomOperator()); + .applyOperator(new TestCustomOperator()); assertTrue(streamlet instanceof CustomStreamlet); CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; assertEquals(20, mStreamlet.getNumPartitions()); @@ -272,7 +272,8 @@ private class MyBoltOperator extends TestBolt implements IStreamletOperator baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); - Streamlet streamlet = baseStreamlet.setNumPartitions(20).perform(new MyBoltOperator()); + Streamlet streamlet = baseStreamlet.setNumPartitions(20). + applyOperator(new MyBoltOperator()); assertTrue(streamlet instanceof CustomStreamlet); CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; assertEquals(20, mStreamlet.getNumPartitions()); From b9e889b87d3d13b6a6ee9a78292b755a8fa80cbb Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Wed, 24 Oct 2018 12:51:07 -0700 Subject: [PATCH 07/10] Clean up after rebase --- .../streamlet/IStreamletBasicOperator.java | 4 ++-- .../heron/streamlet/IStreamletOperator.java | 5 +++++ .../impl/operators/CustomOperator.java | 3 ++- .../streamlet/impl/StreamletImplTest.java | 19 +------------------ 4 files changed, 10 insertions(+), 21 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java index c0f5d06bffd..98b87f355c2 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletBasicOperator.java @@ -23,8 +23,8 @@ import org.apache.heron.api.bolt.IBasicBolt; /** - * The interface for streamlet basic operators. It is used to support existing user bolts - * extended from IBasicBolt only. It shouldn't be used to create streamlet operators. + * The interface for streamlet operators. It can be used to create + * operators based on existing Bolts (subclasses of IBasicBolt). */ public interface IStreamletBasicOperator extends IStreamletOperator, IBasicBolt { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java index 0eecb60b494..00fcb4e160d 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java @@ -21,7 +21,12 @@ package org.apache.heron.streamlet; /** +<<<<<<< 3598935c369f2c0de4832c6c1a3e4db2219edcd4 * The base interface for all Streamlet operator interfaces. +======= + * The interface for custom operators: it can be used to create + * operators based on existing Bolts (subclasses of IRichBolt). +>>>>>>> Clean up after rebase */ public interface IStreamletOperator { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java index 96283e0caa5..a199d6f5320 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -47,7 +47,7 @@ * to implement more advanced features. * 2. Use it in Streamlet * .... - * .perform(new MyOperator()) + * .applyOperator(new MyOperator()) * .... */ public abstract class CustomOperator extends StreamletOperator { @@ -68,6 +68,7 @@ public abstract class CustomOperator extends StreamletOperator { * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. */ + @Override public void prepare(Map heronConf, TopologyContext context, OutputCollector collector) { diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index b9ae29d2d75..23c42c170bf 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -32,8 +32,8 @@ import org.apache.heron.common.basics.ByteAmount; import org.apache.heron.resource.TestBasicBolt; import org.apache.heron.resource.TestBolt; -import org.apache.heron.resource.TestWindowBolt; import org.apache.heron.resource.TestCustomOperator; +import org.apache.heron.resource.TestWindowBolt; import org.apache.heron.streamlet.Config; import org.apache.heron.streamlet.Context; import org.apache.heron.streamlet.IStreamletBasicOperator; @@ -265,23 +265,6 @@ public void testCustomStreamlet() throws Exception { assertEquals(supplierStreamlet.getChildren().get(0), streamlet); } - private class MyBoltOperator extends TestBolt implements IStreamletOperator { - } - - @Test - @SuppressWarnings("unchecked") - public void testCustomStreamletFromBolt() throws Exception { - Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); - Streamlet streamlet = baseStreamlet.setNumPartitions(20). - applyOperator(new MyBoltOperator()); - assertTrue(streamlet instanceof CustomStreamlet); - CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; - assertEquals(20, mStreamlet.getNumPartitions()); - SupplierStreamlet supplierStreamlet = (SupplierStreamlet) baseStreamlet; - assertEquals(supplierStreamlet.getChildren().size(), 1); - assertEquals(supplierStreamlet.getChildren().get(0), streamlet); - } - @Test @SuppressWarnings("unchecked") public void testSimpleBuild() throws Exception { From c0ff2f79df69db3a41b9219880903738fcaea067 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Thu, 8 Nov 2018 15:04:01 -0800 Subject: [PATCH 08/10] rebase master --- .../java/org/apache/heron/streamlet/IStreamletOperator.java | 5 ----- .../apache/heron/streamlet/impl/streamlets/LogStreamlet.java | 3 --- .../apache/heron/streamlet/impl/streamlets/MapStreamlet.java | 4 ---- 3 files changed, 12 deletions(-) diff --git a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java index 00fcb4e160d..0eecb60b494 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java +++ b/heron/api/src/java/org/apache/heron/streamlet/IStreamletOperator.java @@ -21,12 +21,7 @@ package org.apache.heron.streamlet; /** -<<<<<<< 3598935c369f2c0de4832c6c1a3e4db2219edcd4 * The base interface for all Streamlet operator interfaces. -======= - * The interface for custom operators: it can be used to create - * operators based on existing Bolts (subclasses of IRichBolt). ->>>>>>> Clean up after rebase */ public interface IStreamletOperator { } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java index 9dd70426ee6..707751f5dad 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java @@ -24,10 +24,7 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.impl.StreamletImpl; -<<<<<<< 91d465b9f160581bcd3715384a659a7bb88b641d:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/LogStreamlet.java import org.apache.heron.streamlet.impl.sinks.LogSink; -======= ->>>>>>> Move IStreamletOperator and update comments:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java /** * LogStreamlet represents en empty Streamlet that is made up of elements from the parent diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java index 365febe7b36..96c34b9edff 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java @@ -25,11 +25,7 @@ import org.apache.heron.api.topology.TopologyBuilder; import org.apache.heron.streamlet.SerializableFunction; import org.apache.heron.streamlet.impl.StreamletImpl; -<<<<<<< a9fdd20018604d0fc75632ad69cd033e0c9ab591:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/MapStreamlet.java import org.apache.heron.streamlet.impl.operators.MapOperator; -======= -import org.apache.heron.streamlet.impl.operators.IStreamletBasicOperator; ->>>>>>> Refactor ICustomOperator to IStreamletOperator:heron/api/src/java/org/apache/heron/streamlet/impl/streamlets/CustomBasicStreamlet.java /** * MapStreamlet represents a Streamlet that is made up of applying the user From 6d7259af043b11007c0c6f40f22c20dff687c574 Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Thu, 8 Nov 2018 15:28:35 -0800 Subject: [PATCH 09/10] Add CustomOperator unit tests in Scala --- .../heron/resource/TestCustomOperator.scala | 40 ++++++++++++++++ .../scala/impl/StreamletImplTest.scala | 46 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala b/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala new file mode 100644 index 00000000000..668f726074f --- /dev/null +++ b/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.resource + +import org.apache.heron.streamlet.impl.operators.{CustomOperator, CustomOperatorOutput} + +class TestCustomOperator extends CustomOperator[Double, Double] { + override def process(data: Double): CustomOperatorOutput[Double] = { + // Success if data is positive + if (data > 0) { + if (data <= 100) { + // Emit to next component if data is <= 100 + CustomOperatorOutput.succeed(data) + } else { + // Ignore data if it is greater than 100 + CustomOperatorOutput.succeed[Double]() + } + } else { + // Error if it is 0 or negative + CustomOperatorOutput.fail[Double]() + } + } +} diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala index b8437327ff9..381746f7c38 100644 --- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala +++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala @@ -25,6 +25,7 @@ import org.junit.Assert.{assertEquals, assertTrue} import org.apache.heron.resource.{ TestBasicBolt, TestBolt, + TestCustomOperator, TestWindowBolt } import org.apache.heron.streamlet.{ @@ -552,6 +553,51 @@ class StreamletImplTest extends BaseFunSuite { assertEquals(0, customStreamlet.getChildren.size()) } + test("StreamletImpl should support applyOperator operation on CustomOperator") { + val testOperator = new MyWindowBoltOperator() + val supplierStreamlet = builder + .newSource(() => Random.nextDouble()) + .setName("Supplier_Streamlet_1") + .setNumPartitions(3) + + supplierStreamlet + .map[Double] { num: Double => + num * 10 + } + .setName("Map_Streamlet_1") + .setNumPartitions(2) + .applyOperator(new TestCustomOperator) + .setName("CustomOperator_Streamlet_1") + .setNumPartitions(7) + + val supplierStreamletImpl = + supplierStreamlet.asInstanceOf[StreamletImpl[Double]] + assertEquals(1, supplierStreamletImpl.getChildren.size) + assertTrue( + supplierStreamletImpl + .getChildren(0) + .isInstanceOf[MapStreamlet[_, _]]) + val mapStreamlet = supplierStreamletImpl + .getChildren(0) + .asInstanceOf[MapStreamlet[Double, Double]] + assertEquals("Map_Streamlet_1", mapStreamlet.getName) + assertEquals(2, mapStreamlet.getNumPartitions) + assertEquals(1, mapStreamlet.getChildren.size()) + + assertTrue( + mapStreamlet + .getChildren() + .get(0) + .isInstanceOf[CustomStreamlet[_, _]]) + val customStreamlet = mapStreamlet + .getChildren() + .get(0) + .asInstanceOf[CustomStreamlet[Double, Double]] + assertEquals("CustomOperator_Streamlet_1", customStreamlet.getName) + assertEquals(7, customStreamlet.getNumPartitions) + assertEquals(0, customStreamlet.getChildren.size()) + } + test("StreamletImpl should support reduce operation") { val supplierStreamlet = builder .newSource(() => Random.nextInt(10)) From 0503bf6be57b37477446114ab4f341c725ac56bd Mon Sep 17 00:00:00 2001 From: Ning Wang Date: Thu, 8 Nov 2018 15:35:12 -0800 Subject: [PATCH 10/10] clean up --- .../apache/heron/streamlet/scala/impl/StreamletImplTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala index 381746f7c38..9204f7df941 100644 --- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala +++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala @@ -410,7 +410,6 @@ class StreamletImplTest extends BaseFunSuite { } test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") { - val testOperator = new MyBoltOperator() val supplierStreamlet = builder .newSource(() => Random.nextDouble()) @@ -553,7 +552,7 @@ class StreamletImplTest extends BaseFunSuite { assertEquals(0, customStreamlet.getChildren.size()) } - test("StreamletImpl should support applyOperator operation on CustomOperator") { + test("StreamletImpl should support applyOperator operation on CustomOperator") { val testOperator = new MyWindowBoltOperator() val supplierStreamlet = builder .newSource(() => Random.nextDouble())