Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Add custom operator support #3055

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,5 +504,4 @@ public <T> Streamlet<T> applyOperator(IStreamletOperator<R, T> operator) {
addChild(customStreamlet);
return customStreamlet;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.heron.api.bolt.OutputCollector;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Tuple;
import org.apache.heron.api.tuple.Values;

/**
* CustomOperator is the base class for all user defined operators.
* Usage:
* 1. Create user defined operator
* class MyOperator extends CustomOperator<String, String> {
* public MyOperator() {
* ...
* }
*
* @override
* public CustomOperatorOutput<String> CustomOperatorOutput<T> process(String data) {
* ...
* }
* }
* Note that users can override low level bolt functions like getComponentConfiguration() in order
* to implement more advanced features.
* 2. Use it in Streamlet
* ....
* .applyOperator(new MyOperator())
* ....
*/
public abstract class CustomOperator<R, T> extends StreamletOperator<R, T> {

private OutputCollector outputCollector;

/**
* Process function to be implemented by all custom operators.
* @param data The data object to be processed
* @return a CustomOperatorOutput that contains process results and other flags.
*/
public abstract CustomOperatorOutput<T> process(R data);

/**
* Called when a task for this component is initialized within a worker on the cluster.
* It provides the bolt with the environment in which the bolt executes.
* @param heronConf The Heron configuration for this bolt. This is the configuration provided to the topology merged in with cluster configuration on this machine.
* @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc.
* @param collector The collector is used to emit tuples from this bolt. Tuples can be emitted at any time, including the prepare and cleanup methods. The collector is thread-safe and should be saved as an instance variable of this bolt object.
*/
@Override
public void prepare(Map<String, Object> heronConf,
TopologyContext context,
OutputCollector collector) {

this.outputCollector = collector;
}

/**
* Implementation of execute() function to support type safty in Streamlet API
* The function is responsible for:
* - convert incoming tuple to the specified type
* - call the new process() function which has type safty support
* - emit/ack/fail the results
*/
@SuppressWarnings("unchecked")
@Override
public void execute(Tuple tuple) {
R data = (R) tuple.getValue(0);
CustomOperatorOutput<T> results = process(data);

if (results.isSuccessful()) {
List<Tuple> anchors = results.isAnchored() ? Arrays.asList(tuple) : null;
emitResults(results.getData(), anchors);
outputCollector.ack(tuple);
} else {
outputCollector.fail(tuple);
}
}

/**
* Convert process results to tuples and emit out to the downstream
* @param data results lists with corresponding stream id
* @param anchors anchors to be used when emitting tuples
*/
private void emitResults(Map<String, List<T>> data, List<Tuple> anchors) {
for (Map.Entry<String, List<T>> entry : data.entrySet()) {
String streamId = entry.getKey();
for (T value: entry.getValue()) {
outputCollector.emit(streamId, anchors, new Values(value));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.heron.streamlet.impl.operators;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.heron.api.utils.Utils;

/**
* CustomOperatorOutput is the class for data returned by CustomOperators' process() function.
* CustomStreamlet is responsible for converting the output to emit/ack/fail is lowlevel API.
*
* Usage:
* Assuming data is already stored in an integer array: data,
* return CustomOperatorOutput.apply(data);
*/
public final class CustomOperatorOutput<T> {
private Map<String, List<T>> output; // Stream id to output data to be emitted.
private boolean successful; // If the execution succeeded?
private boolean anchored; // If anchors should be added when emitting tuples?

// Disable constructor. User should use the static functions below to create objects
private CustomOperatorOutput(Map<String, List<T>> data) {
this.output = data;
this.successful = data != null;
this.anchored = true;
}

/**
* Get collected data
* @return data to be emitted. The data is a map of stream id to list of objects
*/
public Map<String, List<T>> getData() {
return output;
}

/**
* Check successful flag
* @return true if the execution succeeded. If not successful, fail() will be called
* instead of ack() in bolt
*/
public boolean isSuccessful() {
return successful;
}

/**
* Check anchored flag
* @return true if the output data needs to be anchored when emitting
*/
public boolean isAnchored() {
return anchored;
}

/**
* If the output data needs to be anchored or not
* @param flag the anchored flag
* @return this object itself
*/
public CustomOperatorOutput<T> withAnchor(boolean flag) {
anchored = flag;
return this;
}

/* Util static functions. Users should use them to create objects */
public static <R> CustomOperatorOutput<R> create() {
Map<String, List<R>> dataMap = new HashMap<String, List<R>>();
return new CustomOperatorOutput<R>(dataMap);
}

/**
* Generate a CustomOperatorOutput object with empty output
* @return a CustomOperatorOutput object with empty output
*/
public static <R> CustomOperatorOutput<R> succeed() {
Map<String, List<R>> dataMap = new HashMap<String, List<R>>();
return succeed(dataMap);
}

/**
* Generate a CustomOperatorOutput object with a single object in the default stream
* @param data the data object to be added
* @return the generated CustomOperatorOutput object
*/
public static <R> CustomOperatorOutput<R> succeed(R data) {
return succeed(Arrays.asList(data));
}

/**
* Generate a CustomOperatorOutput object with a list of output objects
* in the default stream
* @param data the list of data to be added
* @return the generated CustomOperatorOutput object
*/
public static <R> CustomOperatorOutput<R> succeed(List<R> data) {
Map<String, List<R>> dataMap = new HashMap<String, List<R>>();
dataMap.put(Utils.DEFAULT_STREAM_ID, data);
return succeed(dataMap);
}

/**
* Generate a CustomOperatorOutput object with a map of stream id to list of output objects
* @param data the output data in a map of stream id to list of output objects
* @return the generated CustomOperatorOutput object
*/
public static <R> CustomOperatorOutput<R> succeed(Map<String, List<R>> data) {
CustomOperatorOutput<R> retval = new CustomOperatorOutput<R>(data);
return retval;
}

/**
* Generate a result that represents a fail result
* @return a result that represents a fail result
*/
public static <R> CustomOperatorOutput<R> fail() {
CustomOperatorOutput<R> failed = new CustomOperatorOutput<R>(null);
return failed;
}
}
2 changes: 2 additions & 0 deletions heron/api/tests/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ java_tests(
"org.apache.heron.api.metric.LatencyStatAndMetricTest",
"org.apache.heron.api.bolt.BaseWindowedBoltTest",
"org.apache.heron.streamlet.impl.StreamletImplTest",
"org.apache.heron.streamlet.impl.operators.CustomOperatorOutputTest",
"org.apache.heron.streamlet.impl.operators.CustomOperatorTest",
"org.apache.heron.streamlet.impl.operators.JoinOperatorTest",
"org.apache.heron.streamlet.impl.operators.ReduceByKeyAndWindowOperatorTest",
"org.apache.heron.streamlet.impl.operators.GeneralReduceByKeyAndWindowOperatorTest",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.heron.resource;

import org.apache.heron.streamlet.impl.operators.CustomOperator;
import org.apache.heron.streamlet.impl.operators.CustomOperatorOutput;

public class TestCustomOperator<T extends Number> extends CustomOperator<T, T> {
@Override
public CustomOperatorOutput<T> process(T data) {
// Success if data is positive
if (data.intValue() > 0) {
if (data.intValue() <= 100) {
// Emit to next component if data is <= 100
return CustomOperatorOutput.succeed(data);
} else {
// Ignore data if it is greater than 100
return CustomOperatorOutput.<T>succeed();
}
} else {
// Error if it is 0 or negative
return CustomOperatorOutput.<T>fail();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.resource.TestBasicBolt;
import org.apache.heron.resource.TestBolt;
import org.apache.heron.resource.TestCustomOperator;
import org.apache.heron.resource.TestWindowBolt;
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Context;
Expand Down Expand Up @@ -250,6 +251,20 @@ public void testCustomStreamletFromWindowBolt() throws Exception {
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}

@Test
@SuppressWarnings("unchecked")
public void testCustomStreamlet() throws Exception {
Streamlet<Double> baseStreamlet = StreamletImpl.createSupplierStreamlet(() -> Math.random());
Streamlet<Double> streamlet = baseStreamlet.setNumPartitions(20)
.applyOperator(new TestCustomOperator<Double>());
assertTrue(streamlet instanceof CustomStreamlet);
CustomStreamlet<Double, Double> mStreamlet = (CustomStreamlet<Double, Double>) streamlet;
assertEquals(20, mStreamlet.getNumPartitions());
SupplierStreamlet<Double> supplierStreamlet = (SupplierStreamlet<Double>) baseStreamlet;
assertEquals(supplierStreamlet.getChildren().size(), 1);
assertEquals(supplierStreamlet.getChildren().get(0), streamlet);
}

@Test
@SuppressWarnings("unchecked")
public void testSimpleBuild() throws Exception {
Expand Down
Loading