diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java index 066dc743fdc..ace4bb681fc 100644 --- a/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/StreamletImpl.java @@ -504,5 +504,4 @@ public Streamlet applyOperator(IStreamletOperator operator) { addChild(customStreamlet); return customStreamlet; } - } diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java new file mode 100644 index 00000000000..a199d6f5320 --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperator.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.api.tuple.Values; + +/** + * CustomOperator is the base class for all user defined operators. + * Usage: + * 1. Create user defined operator + * class MyOperator extends CustomOperator { + * public MyOperator() { + * ... + * } + * + * @override + * public CustomOperatorOutput CustomOperatorOutput process(String data) { + * ... + * } + * } + * Note that users can override low level bolt functions like getComponentConfiguration() in order + * to implement more advanced features. + * 2. Use it in Streamlet + * .... + * .applyOperator(new MyOperator()) + * .... + */ +public abstract class CustomOperator extends StreamletOperator { + + private OutputCollector outputCollector; + + /** + * Process function to be implemented by all custom operators. + * @param data The data object to be processed + * @return a CustomOperatorOutput that contains process results and other flags. + */ + public abstract CustomOperatorOutput process(R data); + + /** + * Called when a task for this component is initialized within a worker on the cluster. + * It provides the bolt with the environment in which the bolt executes. + * @param heronConf The Heron configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine. + * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. + * @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object. + */ + @Override + public void prepare(Map heronConf, + TopologyContext context, + OutputCollector collector) { + + this.outputCollector = collector; + } + + /** + * Implementation of execute() function to support type safty in Streamlet API + * The function is responsible for: + * - convert incoming tuple to the specified type + * - call the new process() function which has type safty support + * - emit/ack/fail the results + */ + @SuppressWarnings("unchecked") + @Override + public void execute(Tuple tuple) { + R data = (R) tuple.getValue(0); + CustomOperatorOutput results = process(data); + + if (results.isSuccessful()) { + List anchors = results.isAnchored() ? Arrays.asList(tuple) : null; + emitResults(results.getData(), anchors); + outputCollector.ack(tuple); + } else { + outputCollector.fail(tuple); + } + } + + /** + * Convert process results to tuples and emit out to the downstream + * @param data results lists with corresponding stream id + * @param anchors anchors to be used when emitting tuples + */ + private void emitResults(Map> data, List anchors) { + for (Map.Entry> entry : data.entrySet()) { + String streamId = entry.getKey(); + for (T value: entry.getValue()) { + outputCollector.emit(streamId, anchors, new Values(value)); + } + } + } +} diff --git a/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java new file mode 100644 index 00000000000..29fdedd279e --- /dev/null +++ b/heron/api/src/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutput.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.heron.api.utils.Utils; + +/** + * CustomOperatorOutput is the class for data returned by CustomOperators' process() function. + * CustomStreamlet is responsible for converting the output to emit/ack/fail is lowlevel API. + * + * Usage: + * Assuming data is already stored in an integer array: data, + * return CustomOperatorOutput.apply(data); + */ +public final class CustomOperatorOutput { + private Map> output; // Stream id to output data to be emitted. + private boolean successful; // If the execution succeeded? + private boolean anchored; // If anchors should be added when emitting tuples? + + // Disable constructor. User should use the static functions below to create objects + private CustomOperatorOutput(Map> data) { + this.output = data; + this.successful = data != null; + this.anchored = true; + } + + /** + * Get collected data + * @return data to be emitted. The data is a map of stream id to list of objects + */ + public Map> getData() { + return output; + } + + /** + * Check successful flag + * @return true if the execution succeeded. If not successful, fail() will be called + * instead of ack() in bolt + */ + public boolean isSuccessful() { + return successful; + } + + /** + * Check anchored flag + * @return true if the output data needs to be anchored when emitting + */ + public boolean isAnchored() { + return anchored; + } + + /** + * If the output data needs to be anchored or not + * @param flag the anchored flag + * @return this object itself + */ + public CustomOperatorOutput withAnchor(boolean flag) { + anchored = flag; + return this; + } + + /* Util static functions. Users should use them to create objects */ + public static CustomOperatorOutput create() { + Map> dataMap = new HashMap>(); + return new CustomOperatorOutput(dataMap); + } + + /** + * Generate a CustomOperatorOutput object with empty output + * @return a CustomOperatorOutput object with empty output + */ + public static CustomOperatorOutput succeed() { + Map> dataMap = new HashMap>(); + return succeed(dataMap); + } + + /** + * Generate a CustomOperatorOutput object with a single object in the default stream + * @param data the data object to be added + * @return the generated CustomOperatorOutput object + */ + public static CustomOperatorOutput succeed(R data) { + return succeed(Arrays.asList(data)); + } + + /** + * Generate a CustomOperatorOutput object with a list of output objects + * in the default stream + * @param data the list of data to be added + * @return the generated CustomOperatorOutput object + */ + public static CustomOperatorOutput succeed(List data) { + Map> dataMap = new HashMap>(); + dataMap.put(Utils.DEFAULT_STREAM_ID, data); + return succeed(dataMap); + } + + /** + * Generate a CustomOperatorOutput object with a map of stream id to list of output objects + * @param data the output data in a map of stream id to list of output objects + * @return the generated CustomOperatorOutput object + */ + public static CustomOperatorOutput succeed(Map> data) { + CustomOperatorOutput retval = new CustomOperatorOutput(data); + return retval; + } + + /** + * Generate a result that represents a fail result + * @return a result that represents a fail result + */ + public static CustomOperatorOutput fail() { + CustomOperatorOutput failed = new CustomOperatorOutput(null); + return failed; + } +} diff --git a/heron/api/tests/java/BUILD b/heron/api/tests/java/BUILD index 03c1f2bb5e5..95805bda2da 100644 --- a/heron/api/tests/java/BUILD +++ b/heron/api/tests/java/BUILD @@ -28,6 +28,8 @@ java_tests( "org.apache.heron.api.metric.LatencyStatAndMetricTest", "org.apache.heron.api.bolt.BaseWindowedBoltTest", "org.apache.heron.streamlet.impl.StreamletImplTest", + "org.apache.heron.streamlet.impl.operators.CustomOperatorOutputTest", + "org.apache.heron.streamlet.impl.operators.CustomOperatorTest", "org.apache.heron.streamlet.impl.operators.JoinOperatorTest", "org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest", "org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest", diff --git a/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java new file mode 100644 index 00000000000..395ebd64ab8 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/resource/TestCustomOperator.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.resource; + +import org.apache.heron.streamlet.impl.operators.CustomOperator; +import org.apache.heron.streamlet.impl.operators.CustomOperatorOutput; + +public class TestCustomOperator extends CustomOperator { + @Override + public CustomOperatorOutput process(T data) { + // Success if data is positive + if (data.intValue() > 0) { + if (data.intValue() <= 100) { + // Emit to next component if data is <= 100 + return CustomOperatorOutput.succeed(data); + } else { + // Ignore data if it is greater than 100 + return CustomOperatorOutput.succeed(); + } + } else { + // Error if it is 0 or negative + return CustomOperatorOutput.fail(); + } + } +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java index 8a0b22d91c3..23c42c170bf 100644 --- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java @@ -32,6 +32,7 @@ import org.apache.heron.common.basics.ByteAmount; import org.apache.heron.resource.TestBasicBolt; import org.apache.heron.resource.TestBolt; +import org.apache.heron.resource.TestCustomOperator; import org.apache.heron.resource.TestWindowBolt; import org.apache.heron.streamlet.Config; import org.apache.heron.streamlet.Context; @@ -250,6 +251,20 @@ public void testCustomStreamletFromWindowBolt() throws Exception { assertEquals(supplierStreamlet.getChildren().get(0), streamlet); } + @Test + @SuppressWarnings("unchecked") + public void testCustomStreamlet() throws Exception { + Streamlet baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random()); + Streamlet streamlet = baseStreamlet.setNumPartitions(20) + .applyOperator(new TestCustomOperator()); + assertTrue(streamlet instanceof CustomStreamlet); + CustomStreamlet mStreamlet = (CustomStreamlet) streamlet; + assertEquals(20, mStreamlet.getNumPartitions()); + SupplierStreamlet supplierStreamlet = (SupplierStreamlet) baseStreamlet; + assertEquals(supplierStreamlet.getChildren().size(), 1); + assertEquals(supplierStreamlet.getChildren().get(0), streamlet); + } + @Test @SuppressWarnings("unchecked") public void testSimpleBuild() throws Exception { diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java new file mode 100644 index 00000000000..e25ae75e059 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorOutputTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.junit.Test; + +import org.apache.heron.api.utils.Utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class CustomOperatorOutputTest { + @Test + public void testOutputSucceed() { + CustomOperatorOutput output = CustomOperatorOutput.succeed(); + assertTrue(output.isSuccessful()); + assertTrue(output.getData().isEmpty()); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithObject() { + CustomOperatorOutput output = CustomOperatorOutput.succeed(100); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 1); + assertTrue(output.getData().containsKey(Utils.DEFAULT_STREAM_ID)); + + List data = output.getData().get(Utils.DEFAULT_STREAM_ID); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithList() { + CustomOperatorOutput output = + CustomOperatorOutput.succeed(Arrays.asList(100)); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 1); + assertTrue(output.getData().containsKey(Utils.DEFAULT_STREAM_ID)); + + List data = output.getData().get(Utils.DEFAULT_STREAM_ID); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputSucceedWithMap() { + HashMap> map = new HashMap>(); + List list1 = Arrays.asList(100); + List list2 = Arrays.asList(200); + map.put("stream1", list1); + map.put("stream2", list2); + + CustomOperatorOutput output = CustomOperatorOutput.succeed(map); + assertTrue(output.isSuccessful()); + assertEquals(output.getData().size(), 2); + assertTrue(output.getData().containsKey("stream1")); + assertTrue(output.getData().containsKey("stream2")); + + List data = output.getData().get("stream1"); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 100); + assertTrue(output.isAnchored()); + + data = output.getData().get("stream2"); + assertEquals(data.size(), 1); + assertEquals(data.toArray()[0], 200); + assertTrue(output.isAnchored()); + } + + @Test + public void testOutputFail() { + CustomOperatorOutput output = CustomOperatorOutput.fail(); + assertFalse(output.isSuccessful()); + } + + @Test + public void testOutputWithAnchor() { + CustomOperatorOutput anchored = + CustomOperatorOutput.create().withAnchor(true); + assertTrue(anchored.isAnchored()); + + CustomOperatorOutput unanchored = + CustomOperatorOutput.create().withAnchor(false); + assertFalse(unanchored.isAnchored()); + } +} diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java new file mode 100644 index 00000000000..fd28d8bf173 --- /dev/null +++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/operators/CustomOperatorTest.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.streamlet.impl.operators; + +import java.util.Arrays; +import java.util.HashMap; + +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.mockito.PowerMockito; + +import org.apache.heron.api.Config; +import org.apache.heron.api.bolt.OutputCollector; +import org.apache.heron.api.generated.TopologyAPI; +import org.apache.heron.api.topology.TopologyBuilder; +import org.apache.heron.api.topology.TopologyContext; +import org.apache.heron.api.tuple.Fields; +import org.apache.heron.api.tuple.Tuple; +import org.apache.heron.api.tuple.Values; +import org.apache.heron.common.utils.topology.TopologyContextImpl; +import org.apache.heron.common.utils.tuple.TupleImpl; +import org.apache.heron.resource.TestCustomOperator; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class CustomOperatorTest { + private TestCustomOperator testOperator; + + private Tuple getTuple(String stream, final Fields fields, Values values) { + TopologyAPI.StreamId streamId + = TopologyAPI.StreamId.newBuilder() + .setComponentName("custom1").setId(stream).build(); + TopologyContext topologyContext = getContext(fields); + return new TupleImpl(topologyContext, streamId, 0, + null, values, 1) { + @Override + public TopologyAPI.StreamId getSourceGlobalStreamId() { + return TopologyAPI.StreamId.newBuilder().setComponentName("sourceComponent") + .setId("default").build(); + } + }; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private TopologyContext getContext(final Fields fields) { + TopologyBuilder builder = new TopologyBuilder(); + return new TopologyContextImpl(new Config(), + builder.createTopology() + .setConfig(new Config()) + .setName("test") + .setState(TopologyAPI.TopologyState.RUNNING) + .getTopology(), + new HashMap(), 1, null) { + @Override + public Fields getComponentOutputFields( + String componentId, String streamId) { + return fields; + } + }; + } + + @Before + public void setUp() { + testOperator = new TestCustomOperator(); + } + + @Test + public void testOutputTuples() { + OutputCollector collector = PowerMockito.mock(OutputCollector.class); + testOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class), collector); + + Tuple[] tuples = { + getTuple("default", new Fields("a"), new Values(0)), // to fail + getTuple("default", new Fields("a"), new Values(1)), + getTuple("default", new Fields("a"), new Values(2)), + getTuple("default", new Fields("a"), new Values(3)), + getTuple("default", new Fields("a"), new Values(4)), + getTuple("default", new Fields("a"), new Values(5)), + getTuple("default", new Fields("a"), new Values(101)), // to skip + }; + + for (Tuple tuple: tuples) { + testOperator.execute(tuple); + } + + verify(collector, times(0)).emit(any(), eq(Arrays.asList(tuples[0])), any()); + verify(collector, times(0)).emit(any(), eq(Arrays.asList(tuples[6])), any()); + + verify(collector, times(1)).emit("default", Arrays.asList(tuples[1]), Arrays.asList(1)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[2]), Arrays.asList(2)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[3]), Arrays.asList(3)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[4]), Arrays.asList(4)); + verify(collector, times(1)).emit("default", Arrays.asList(tuples[5]), Arrays.asList(5)); + } + + @Test + public void testAckAndFail() { + OutputCollector collector = PowerMockito.mock(OutputCollector.class); + testOperator.prepare(new Config(), PowerMockito.mock(TopologyContext.class), collector); + + Tuple[] tuples = { + getTuple("default", new Fields("a"), new Values(0)), // tuple to fail + getTuple("default", new Fields("a"), new Values(1)), // tuples to process + getTuple("default", new Fields("a"), new Values(101)) // tuple to skip but still ack + }; + + for (Tuple tuple: tuples) { + testOperator.execute(tuple); + } + + verify(collector, times(0)).ack(tuples[0]); + verify(collector, times(1)).fail(tuples[0]); + verify(collector, times(1)).ack(tuples[1]); + verify(collector, times(0)).fail(tuples[1]); + verify(collector, times(1)).ack(tuples[2]); + verify(collector, times(0)).fail(tuples[2]); + } +} diff --git a/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala b/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala new file mode 100644 index 00000000000..668f726074f --- /dev/null +++ b/heron/api/tests/scala/org/apache/heron/resource/TestCustomOperator.scala @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.heron.resource + +import org.apache.heron.streamlet.impl.operators.{CustomOperator, CustomOperatorOutput} + +class TestCustomOperator extends CustomOperator[Double, Double] { + override def process(data: Double): CustomOperatorOutput[Double] = { + // Success if data is positive + if (data > 0) { + if (data <= 100) { + // Emit to next component if data is <= 100 + CustomOperatorOutput.succeed(data) + } else { + // Ignore data if it is greater than 100 + CustomOperatorOutput.succeed[Double]() + } + } else { + // Error if it is 0 or negative + CustomOperatorOutput.fail[Double]() + } + } +} diff --git a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala index b8437327ff9..9204f7df941 100644 --- a/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala +++ b/heron/api/tests/scala/org/apache/heron/streamlet/scala/impl/StreamletImplTest.scala @@ -25,6 +25,7 @@ import org.junit.Assert.{assertEquals, assertTrue} import org.apache.heron.resource.{ TestBasicBolt, TestBolt, + TestCustomOperator, TestWindowBolt } import org.apache.heron.streamlet.{ @@ -409,7 +410,6 @@ class StreamletImplTest extends BaseFunSuite { } test("StreamletImpl should support applyOperator operation on IStreamletRichOperator") { - val testOperator = new MyBoltOperator() val supplierStreamlet = builder .newSource(() => Random.nextDouble()) @@ -552,6 +552,51 @@ class StreamletImplTest extends BaseFunSuite { assertEquals(0, customStreamlet.getChildren.size()) } + test("StreamletImpl should support applyOperator operation on CustomOperator") { + val testOperator = new MyWindowBoltOperator() + val supplierStreamlet = builder + .newSource(() => Random.nextDouble()) + .setName("Supplier_Streamlet_1") + .setNumPartitions(3) + + supplierStreamlet + .map[Double] { num: Double => + num * 10 + } + .setName("Map_Streamlet_1") + .setNumPartitions(2) + .applyOperator(new TestCustomOperator) + .setName("CustomOperator_Streamlet_1") + .setNumPartitions(7) + + val supplierStreamletImpl = + supplierStreamlet.asInstanceOf[StreamletImpl[Double]] + assertEquals(1, supplierStreamletImpl.getChildren.size) + assertTrue( + supplierStreamletImpl + .getChildren(0) + .isInstanceOf[MapStreamlet[_, _]]) + val mapStreamlet = supplierStreamletImpl + .getChildren(0) + .asInstanceOf[MapStreamlet[Double, Double]] + assertEquals("Map_Streamlet_1", mapStreamlet.getName) + assertEquals(2, mapStreamlet.getNumPartitions) + assertEquals(1, mapStreamlet.getChildren.size()) + + assertTrue( + mapStreamlet + .getChildren() + .get(0) + .isInstanceOf[CustomStreamlet[_, _]]) + val customStreamlet = mapStreamlet + .getChildren() + .get(0) + .asInstanceOf[CustomStreamlet[Double, Double]] + assertEquals("CustomOperator_Streamlet_1", customStreamlet.getName) + assertEquals(7, customStreamlet.getNumPartitions) + assertEquals(0, customStreamlet.getChildren.size()) + } + test("StreamletImpl should support reduce operation") { val supplierStreamlet = builder .newSource(() => Random.nextInt(10))